CyclicBarrier通过使用条件变量和条件队列,达到了回环屏障的效果。
一言以蔽之:CyclicBarrier主要是实现了多个线程之间相互等待,直到所有的线程都满足了条件之后各自才能继续执行后续的操作(屏障),描述的多个线程内部相互等待的关系。然后可以重置他的状态让他可以被重用(回环)。
我们看看他是怎么利用条件变量达到这两个效果的。
一、变量和对象
//generation 对象维护一个 boolean 类型的 broken
private static class Generation {
boolean broken = false;
}
/** 基于 ReentrantLock,采用默认的非公平锁策略*/
private final ReentrantLock lock = new ReentrantLock();
/** 条件变量也是用的 AQS 的 ConditionObject*/
private final Condition trip = lock.newCondition();
//参与的线程总数
private final int parties;
//每次计数完成时,会执行的动作,在 cyclicBarrier 的构造器中被赋值
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
* 处在等待状态的线程数,从 parties 到 0 倒数。
在每一个新的 generation 或者当broken的时候,被重置为总线程数
*/
private int count;
二、实现方法
来看一下他的核心方法:
1、await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
2、这里的核心方法是 dowait
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//先获取可重入非公平锁
lock.lock();
try {
final Generation g = generation;
//若“当前generation已损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 breakBarrier 调用了 signalAll
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//对 count 进行-1
int index = --count;
//当 count=0 的时候,这里的核心方法是 nextGeneration
if (index == 0) { // tripped
boolean ranAction = false;
try {
//执行初始化cyclicBarrier时传递的Runnable 任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//激活其他因调用 await 方法被阻塞的线程,并重置 CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//当 count!=0 的时候,则计数器还没结束,这里的核心是 trip.await()
for (;;) {
try {
if (!timed)
//将当前线程放入条件队列挂起
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等待过程中,线程被中断,则执行下面的函数。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果“当前generation已经损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果“generation已经换代”,则返回index。
if (g != generation)
return index;
//如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放独占锁,并唤醒后续线程
lock.unlock();
}
}
那么我们来看下nextGeneration和 trip.wait到底做了什么事:
3、nextGeneration
这里可以看到调用了 trip.signalAll,然后对 count 和 generation 进行了重置
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
signalAll()
是 AQS 中conditionObject 条件变量的方法,从注释来看,他是将所有条件队列中的线程都移动到 AQS 的等待同步队列中去,通过调用 doSignalAll()
,循环遍历条件队列,然后用transferForSignal()
转移到同步队列中去,转移的方法解释详见 AQS 那一篇。
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
//通过循环遍历,将所有的线程都转移到等待队列中
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
因此 nextGeneration 方法就是将当前条件队列中的所有线程转移到同步队列中去,并重置 cyclicBarrier
4、trip.await
这个方法也是 AQS 中条件变量的等待方法。我们在 AQS 中分析过,这个方法的作用就是
- 1、当线程用 lock()方法获取锁之后,调用条件变量的 await 方法时,先调用
addConditionWaiter()
在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入等待队列末尾。 -
2、然后释放当前线程获取的锁,并被阻塞挂起
源码就不放了,和 AQS 中一样。
那么我们再回过头来看下 CyclicBarrier 的 dowait 方法逻辑:
现在有线程 A 和线程 B两个线程在执行,然后分别调用 cyclicBarrier.await
- 线程 A 调用 await 方法,获取锁,此时 count!=0,进入条件队列,释放锁并挂起
- 线程 B 调用 await 方法,获取锁,此时 count=0,调用nextGeneration,将线程 A 从条件队列里转移到同步队列并设置 ws 为 SIGNAL继续运行,然后线程 B释放锁。
- Lock.unlock 调用的是 ReentrantLock 的 unlock 方法,释放锁成功之后,调用 unparkSuccessor 唤醒线程 A
- 然后线程 A 和 B 继续向下运行。如果再次调用 await,则重复上述步骤。
5、与 countDownLatch 的区别
CyclicBarrier 和 CountDownLatch 有什么不同之处呢?
-
最主要的区别就是 cyclicBarrier 可以重用
-
因为 CyclicBarrier基于 ReentrantLock, 是在 AQS 的 state 状态之外,重新维护了一个 count 和 parties,并且为 count 设置了重置的方法。通过将线程阻塞到条件队列,然后将线程转移到等待队列进行唤醒。因此可以在一个线程中被多次调用。获取锁释放锁和唤醒线程的操作,都是交给 ReentrantLock 和条件变量来实现的,底层还是 AQS。
-
而 CountDownLatch 直接基于 AQS,将 AQS 的 state 作为计数器,没有重置机制,也没有使用条件变量。