CountDownLatch 源码解析

了解了 AQS 的获取锁和释放锁的机制之后,countDownLatch 的逻辑看起来就比较清晰了。

他也是基于 AQS共享模式实现的一个计数器闭锁,他可以让一个或者多个线程等待,直到所有线程执行的操作完成。

简单的来说,他利用 AQS 的 state 作为计数器,将调用 await 方法的线程放入等待队列,然后 countDown 方法一方面让count 的值减 1,一方面当 count 减少到0 时,调用 doReleaseShared 方法释放锁,唤醒被 awaite方法阻塞的线程

下面来看下主要的几个方法,countDownLatch 自己实现了 tryAcquire 和 tryReleaseShared 方法。

1、await 方法

除了 tryAcquireShared 方法, await 调用的都是 AQS 中的方法。

  • 先调用 tryAcquireShared 获取资源,当计数器 count=0 时,直接返回,
  • 否则调用doAcquireSharedInterruptibly插入到同步队列的队尾
 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    //判断 count 是否为 0,是的话返回 1,否则返回 -1
   protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

         /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2、countDown

这里的 countDown 主要也是调用 AQS的releaseShared一系列方法。

关键不同在于tryReleaseShared

这个方法实现了对 count 的-1 操作,并通过 CAS 重新给 count 赋值。

当 count =0 时,调用 doReleaseShared方法唤醒 await 中被阻塞的线程继续执行。

  public void countDown() {
        sync.releaseShared(1);
    }


    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
     protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

最后附上对源码的大致翻译: