了解了 AQS 的获取锁和释放锁的机制之后,countDownLatch 的逻辑看起来就比较清晰了。
他也是基于 AQS共享模式实现的一个计数器闭锁,他可以让一个或者多个线程等待,直到所有线程执行的操作完成。
简单的来说,他利用 AQS 的 state 作为计数器,将调用 await 方法的线程放入等待队列,然后 countDown 方法一方面让count 的值减 1,一方面当 count 减少到0 时,调用 doReleaseShared 方法释放锁,唤醒被 awaite方法阻塞的线程。
下面来看下主要的几个方法,countDownLatch 自己实现了 tryAcquire 和 tryReleaseShared 方法。
1、await 方法
除了 tryAcquireShared 方法, await 调用的都是 AQS 中的方法。
- 先调用 tryAcquireShared 获取资源,当计数器 count=0 时,直接返回,
- 否则调用
doAcquireSharedInterruptibly
插入到同步队列的队尾
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//判断 count 是否为 0,是的话返回 1,否则返回 -1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2、countDown
这里的 countDown 主要也是调用 AQS的releaseShared
一系列方法。
关键不同在于tryReleaseShared
。
这个方法实现了对 count 的-1 操作,并通过 CAS 重新给 count 赋值。
当 count =0 时,调用 doReleaseShared
方法唤醒 await 中被阻塞的线程继续执行。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
最后附上对源码的大致翻译: