AQS:AbstractQueuedSynchronizer
抽象同步队列,提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,并发包中锁的底层实现就是利用 AQS 来实现的,比如 ReentrantLock,CountDownLatch,Semaphore,CyclicBarrier,Worker in ThreadPoolExecutor。
一、主要变量及对象
1、state
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
- AQS 中维护了一个 volatile 变量 state,用来表示同步状态,
- 在不同的子类中可以定义不同的值,
- 通过 get、set 和 CAS 操作来保证了读取和设置操作的原子性。
2、Node & CHL wait queue
AQS 中维护了一个变种的 CHL lock 等待队列,以及队列节点 Node。
CLH CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。
CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋
static final class Node {
/** 用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的 */
static final Node SHARED = new Node();
/** 用来标记该线程是获取独占资源时被阻塞挂起后放入 AQS 队列的 */
static final Node EXCLUSIVE = null;
//以下四个都是 waitStatus 的状态值
/**表示线程被取消了 */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking
表示处于这个状态的当前节点在资源被释放或者取消时必须唤醒他的后继节点。*/
static final int SIGNAL = -1;
/** 表示线程在条件队列中 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
表示释放共享资源时需要通知其他节点
*/
static final int PROPAGATE = -3;
/**
* 状态变量,只有以下几种值:
* SIGNAL -1 : 线程需要被唤醒。这个节点的后继节点处于阻塞状态,所以处于-1 状态的当前节点在被
释放或者取消时必须唤醒他的后继节点。
为了避免竞争,acquire 方法必须先表明他们需要一个
signal,然后进行原子性获取操作的尝试,如果失败的话,阻塞
* CANCELLED 1 : node 由于超时或者中断被取消。一个有着取消状态节点的线程不会再获得锁。
*
* CONDITION -2 : 节点处在条件队列中,直到被转移到同步等待队列中才会被使用,
同时 status 会被设置为 0
*
* PROPAGATE -3 :一个释放共享锁的节点应该通知其他节点,这是在doReleaseShared方法中被
设置的,为了确保传播的继续,即使有其他操作的介入。
* 0: None of the above
*
* 这个变量初始化的时候是 0,通过 CAS 操作来修改
*/
volatile int waitStatus;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
使这个节点进入条件队列的线程,在构造函数中初始化,在使用完之后变为 null
*/
volatile Thread thread;
........
}
3、Condition
条件变量用于配合线程之间的唤醒和阻塞等同步操作。
public class ConditionObject implements Condition, java.io.Serializable {...}
- AQS 中的内部类ConditionObject 是 Condition 接口的一个实现。可以访问 AQS 内部的变量和方法。
- 每个条件变量内部都维护了一个条件队列,用来存放调用 condition.await()时被阻塞的线程。和 AQS 的等待同步队列不是一回事。
实现原理:
1、当线程用 lock()方法获取锁之后,调用条件变量的 await 方法时,先调用addConditionWaiter()
在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入等待队列末尾。
2、然后释放当前线程获取的锁,并被阻塞挂起
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建新的节点,并插入队列末尾
Node node = addConditionWaiter();
//释放当前线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//调用 park 方法阻塞挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Adds a new waiter to wait queue.
将一个等待线程添加到等待队中,返回一个新的等待节点
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清空[条件等待队列]中节点状态不为 CONDITION 的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个waitStatus=CONDITION 的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
//赋值给最末节点并返回
lastWaiter = node;
return node;
}
3、当另外一个线程在获取锁之后调用signal()
方法时,在内部会把条件队列里第一个节点移除并放入 AQS 的阻塞队列中,然后激活这个线程。
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
将等待时间最长的线程从 Condition 的等待队列中,移动到 AQS 获取锁的等待队列中
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 设置新的头节点,并将节点从[条件等待队列]中移除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 直到将该节点加入[同步等待队列]或[条件等待队列]为空时跳出循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//若不能修改节点状态,说明该节点已被取消
// 如果节点node的状态不是Node.CONDITION,或者更新状态失败,
// 说明该node节点已经插入到同步队列中,所以直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将节点添加进[同步等待队列]
Node p = enq(node);
int ws = p.waitStatus;
//如果该节点的状态为 cancel 或者修改waitStatus失败,则直接唤醒。
// 如果前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。
// 就说明节点p之后也不会发起唤醒下一个node节点线程的操作,
// 所以这里直接调用 LockSupport.unpark(node.thread)方法,唤醒节点node所在线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
二、AQS 的两种模式
AQS 定义两种资源共享方式:
Exclusive
:独占型 比如 ReentrantLockShare
:共享型 比如Semaphore 和CountDownLatch
独占模式的主要方法:
1、aquire
如果当前线程获取到锁,直接返回,否则进入等待队列,直到获取到锁为止,而且忽略中断的影响。
方法逻辑:
- tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中获取资源,自旋然后挂起,等到获取到资源后才返回。如果在整个等待过程中被中断过,则最后返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
互斥模式下,忽略中断的acquire方法。通过至少调用一次 tryAcquire 返回 success 来实现。
失败的话线程进入等待队列,可能重复阻塞或非阻塞,直到成功调用 tryAcquire。
这个方法可以用来实现 Lock.lock
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2、tryAcquire
注释大概意思是:
- 试图在互斥模式下获取资源。这个方法应该先查询一下对象是否允许被获取。
- 这个方法每次都会被调用 acquire 的线程执行,如果这个方法执行失败,acquire 方法会将 thread 加入到等待队列,直到被其他线程的调用 signal 释放。该方法可以用于实现Lock中的tryLock()方法。
具体实现由自定义的扩展了AQS的同步类来实现。AQS在这里只负责定义了一个公共的方法框架。这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3、addWaiter
- 为当前线程创建节点,并添加到等待队列的尾部,返回当前线程所在节点。
- 如果队列不为空,则以通过
compareAndSetTail
方法以CAS的方式将当前线程节点加入到等待队列的末尾。否则,通过enq(node)方法初始化一个等待队列,并返回当前节点。
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//通过入参 mode 来创建节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//当队列不为空
if (pred != null) {
node.prev = pred;
//通过 CAS 将当前线程的节点添加到等待队列尾部
//如果 prevd 和当前 tail 相等,则替换为 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//当队列为空,初始化队列并插入 node 到尾部
enq(node);
return node;
}
//这里的队列没有用一个容器类去存放,而是直接用 node 相互关联
private Node enq(final Node node) {
//死循环,以自旋方式进行,直到成功加入队尾为止。
for (;;) {
Node t = tail;
//当等待队列为空时,进行初始化
// 如果 tail 是 null,就创建一个虚拟节点,同时指向 head 和 tail,称为 初始化。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 和 上个方法逻辑一样,将新节点追加到 tail 节点后面,并更新队列的 tail 为新节点。
// 只不过这里是死循环的,失败了还可以再来 。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4、acquireQueued
acquireQueued()
用于队列中的线程自旋地以独占且不可中断的方式获取锁(acquire),直到拿到锁之后再返回。
先获取当前节点的前一节点p,如果p是head的话就再进行一次tryAcquire(arg)操作,如果成功就返回,否则就执行shouldParkAfterFailedAcquire、parkAndCheckInterrupt来达到阻塞效果;然后等待前一个节点释放锁之后对自己进行唤醒。
shouldParkAfterFailedAcquire()
方法通过对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作。-
parkAndCheckInterrupt()
:该方法让线程去休息,真正进入等待状态。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} 返回在等待时是否被中断
*/
参数 Node 是 addWaiter 返回的插入的节点,arg 是 acquire 接收的 arg,表示请求的数量。
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源,默认false
boolean failed = true;
try {
//标记等待过程是否被中断
boolean interrupted = false;
//死循环 自旋不断尝试获取锁,直到获取成功
for (;;) {
//这时候 node 是队列最尾部的节点,
// 如果上一个节点是 head ,就尝试获取锁
// 如果 获取成功,就将当前节点设置为 head,注意 head 节点是永远不会唤醒的。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获取锁成功之后,将当前节点设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* 检查并且更新获取失败的节点状态,当线程应该阻塞的时候返回 true。
这是在所有 acquire 循环中主要的 signal 控制。需要 pred==node.prev
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} 如果线程需要阻塞的话返回 true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果他的上一个节点的 ws 是 SIGNAL,他就需要阻塞。
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
前一个节点已经设置状态为请求 release 节点去唤醒他了,所以当前节点可以安全的挂起
也就是只有当前驱节点为SIGNAL时这个线程才可以进入等待状态。
*/
return true;
//前一个节点线程被取消了,跳过该节点并且重试
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus 必须是 0 或者 PROPAGATE. 表明需要被唤醒,但是还没被挂起
调用者必须重新确认在挂起之前他不能获取到资源
*/
// 如果没有取消 || 0 || CONDITION || PROPAGATE,那么就将前任的 ws 设置成 SIGNAL.
// 为什么必须是 SIGNAL 呢?
// 答:希望自己的上一个节点在释放锁的时候,通知自己(让自己获取锁)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 挂起线程然后检查是否被中断
*
* @return {@code true} 被中断的话返回 true
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
5、release
release 的方法逻辑就比较简单,调用 tryRelease 成功后,直接调用 unparkSuccessor,唤醒后继节点中的线程去获得锁,让其继续acquire状态。这里的 tryRelease 方法也是需要子类去具体实现的。
release(int)
方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒头节点后继节点的线程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
如果 ws 是负数的话,将 ws 设置为 0 清除信号。表示,他已经释放过了。不能重复释放。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
需要唤醒的线程在后继节点中,一般来说是下一个节点,
但是如果遇到取消或者为 null 的情况,那么就从尾部节点开始从后往前遍历,直到找到一个没有取消的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//调用 unpark 方法唤醒线程
LockSupport.unpark(s.thread);
}
与acquire()方法中的tryAcquire()类似,tryRelease()方法也是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
综上,独占模式的整个流程如下:
共享模式的主要方法:
1、acquireShared
获取共享资源的顶层入口。方法逻辑和 acquire 类似:
- 先调用 tryAcquireShared 试图获取锁,失败的话调用 doAcquireShared
- 如果
tryAcquireShared
返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。最后、如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁
- 如果
- doAcquireShared先在队尾插入新节点,然后和 acquiredQueued 相同逻辑进行自旋获取锁
-
不同之处是,当节点中的线程获取资源之后,如果资源还足够的话,他会接着唤醒后续节点去获取
通过调用
setHeadAndPropagate
调用doReleaseShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//在队列末尾插入新节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//获取同步状态
int r = tryAcquireShared(arg);
//大于 0 表示获取到了
if (r >= 0) {
//设置为头结点,并且有多余资源的话一起唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
//如果因为中断醒来的话,就设置中断标记位
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//判断线程是否可以进行休息如果可以休息就调用park方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
* 设置队列的头节点, 并检查后继节点是否在共享模式等待被唤醒,如果是的话,就唤醒他们。
* @param node the node 当前成功获取锁的节点
* @param propagate t tryAcquireShared返回的值,可能大于0 也可能等于0
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//设置新的头节点,即把当前获取到锁的节点设置为头节点
//注:这里是获取到锁之后的操作,不需要并发控制
setHead(node);
//这里意思有两种情况是需要执行唤醒操作
//1.propagate > 0 表示调用方指明了后继节点需要被唤醒
//2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
2、releaseShared
该方法会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
那么这里和独占模式有什么区别呢?
独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。
而且如果第一次唤醒一个后继节点之后,如果头结点发生变化,说明其他线程获取到了锁,那么就继续循环,传递唤醒动作,这样的话就可以唤醒不止一个节点。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//一直循环设置 h 的 ws 为 0
//这里需要用控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒后续节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head) // loop if head changed
break;
}
}
三、总结
AQS在并发中是一个非常重要的基础类,它定义了很多同步组件需要的方法。通过这些方法开发者可以简单的实现一个相关的锁。我们详解了独占和共享两种模式下获取-释放资源(acquire-release、acquireShared-releaseShared)的源码,相信大家都有一定认识了。值得注意的是,acquire()和acquireSahred()两种方法下,线程在等待队列中都是忽略中断的。
AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()即是,这里相应的源码跟acquire()和acquireSahred()差不多,这里就简单阐述一下。