CyclicBarrier 源码解析

CyclicBarrier通过使用条件变量和条件队列,达到了回环屏障的效果。

一言以蔽之:CyclicBarrier主要是实现了多个线程之间相互等待,直到所有的线程都满足了条件之后各自才能继续执行后续的操作(屏障),描述的多个线程内部相互等待的关系。然后可以重置他的状态让他可以被重用(回环)

我们看看他是怎么利用条件变量达到这两个效果的。

一、变量和对象

//generation 对象维护一个 boolean 类型的 broken 
private static class Generation {
        boolean broken = false;
    }
    /** 基于 ReentrantLock,采用默认的非公平锁策略*/
    private final ReentrantLock lock = new ReentrantLock();
    /** 条件变量也是用的 AQS 的 ConditionObject*/
    private final Condition trip = lock.newCondition();
    //参与的线程总数
    private final int parties;
    //每次计数完成时,会执行的动作,在 cyclicBarrier 的构造器中被赋值
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     * 处在等待状态的线程数,从 parties 到 0 倒数。
     在每一个新的 generation 或者当broken的时候,被重置为总线程数
     */
    private int count;

二、实现方法

来看一下他的核心方法:

1、await

  public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

2、这里的核心方法是 dowait

 /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //先获取可重入非公平锁
        lock.lock();
        try {
            final Generation g = generation;
            //若“当前generation已损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 breakBarrier 调用了 signalAll
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            //对 count 进行-1
            int index = --count;
            //当 count=0 的时候,这里的核心方法是 nextGeneration
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //执行初始化cyclicBarrier时传递的Runnable 任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true; 
                    //激活其他因调用 await 方法被阻塞的线程,并重置 CyclicBarrier
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //当 count!=0 的时候,则计数器还没结束,这里的核心是 trip.await()
            for (;;) {
                try {
                    if (!timed)
                        //将当前线程放入条件队列挂起
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果等待过程中,线程被中断,则执行下面的函数。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                // 如果“当前generation已经损坏”,则抛出异常。
                if (g.broken)
                    throw new BrokenBarrierException();

                // 如果“generation已经换代”,则返回index。
                if (g != generation)
                    return index;

                //如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放独占锁,并唤醒后续线程
            lock.unlock();
        }
    }

那么我们来看下nextGeneration和 trip.wait到底做了什么事:

3、nextGeneration

这里可以看到调用了 trip.signalAll,然后对 count 和 generation 进行了重置

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

signalAll()是 AQS 中conditionObject 条件变量的方法,从注释来看,他是将所有条件队列中的线程都移动到 AQS 的等待同步队列中去,通过调用 doSignalAll(),循环遍历条件队列,然后用transferForSignal()转移到同步队列中去,转移的方法解释详见 AQS 那一篇。

   /**
         * Moves all threads from the wait queue for this condition to
         * the wait queue for the owning lock.
         * 
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
         /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            //通过循环遍历,将所有的线程都转移到等待队列中
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

因此 nextGeneration 方法就是将当前条件队列中的所有线程转移到同步队列中去,并重置 cyclicBarrier

4、trip.await

这个方法也是 AQS 中条件变量的等待方法。我们在 AQS 中分析过,这个方法的作用就是

  • 1、当线程用 lock()方法获取锁之后,调用条件变量的 await 方法时,先调用addConditionWaiter()在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入等待队列末尾。

  • 2、然后释放当前线程获取的锁,并被阻塞挂起

源码就不放了,和 AQS 中一样。

那么我们再回过头来看下 CyclicBarrier 的 dowait 方法逻辑:

现在有线程 A 和线程 B两个线程在执行,然后分别调用 cyclicBarrier.await

  1. 线程 A 调用 await 方法,获取锁,此时 count!=0,进入条件队列,释放锁并挂起
  2. 线程 B 调用 await 方法,获取锁,此时 count=0,调用nextGeneration,将线程 A 从条件队列里转移到同步队列并设置 ws 为 SIGNAL继续运行,然后线程 B释放锁。
    • Lock.unlock 调用的是 ReentrantLock 的 unlock 方法,释放锁成功之后,调用 unparkSuccessor 唤醒线程 A
  3. 然后线程 A 和 B 继续向下运行。如果再次调用 await,则重复上述步骤。

5、与 countDownLatch 的区别

CyclicBarrier 和 CountDownLatch 有什么不同之处呢?

  • 最主要的区别就是 cyclicBarrier 可以重用

  • 因为 CyclicBarrier基于 ReentrantLock, 是在 AQS 的 state 状态之外,重新维护了一个 count 和 parties,并且为 count 设置了重置的方法。通过将线程阻塞到条件队列,然后将线程转移到等待队列进行唤醒。因此可以在一个线程中被多次调用。获取锁释放锁和唤醒线程的操作,都是交给 ReentrantLock 和条件变量来实现的,底层还是 AQS。

  • 而 CountDownLatch 直接基于 AQS,将 AQS 的 state 作为计数器,没有重置机制,也没有使用条件变量。