AQS 框架
AQS 框架概览
首先,我们通过下面的架构图来整体了解一下 AQS 框架:
上图中有颜色的为 Method,无颜色的为 Attribution。总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
AQS 核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。CLH 即 Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
AQS 使用一个 volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对 State 值的修改。AQS 使用一个 FIFO 的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态 waitStatus。
ReentrantLock 的基本实现可以概括为:先通过 CAS 尝试获取锁。如果此时已经有线程占据了锁,那就加入 AQS 队列并且被挂起。当锁被释放之后,排在 CLH 队列队首的线程会被唤醒,然后 CAS 再次尝试获取锁。在这个时候,如果:
- 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取;
- 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。
AQS 数据结构
先来看下 AQS 中最基本的数据结构——Node,Node 即为上面 CLH 变体队列中的节点。
解释一下几个方法和属性值的含义:
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
thread | 表示处于该节点的线程 |
prev | 前驱指针 |
predecessor | 返回前驱节点,没有的话抛出 npe |
nextWaiter | 指向下一个处于 CONDITION 状态的节点(由于本篇文章不讲述 Condition Queue 队列,这个指针不多介绍) |
next | 后继指针 |
线程两种锁的模式:
模式 | 含义 |
---|---|
SHARED | 表示线程以共享的模式等待锁 |
EXCLUSIVE | 表示线程正在以独占的方式等待锁 |
waitStatus 有下面几个枚举值:
枚举 | 含义 |
---|---|
0 | 当一个 Node 被初始化的时候的默认值 |
CANCELLED | 为 1,表示线程获取锁的请求已经取消了 |
CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 |
PROPAGATE | 为-3,当前线程处在 SHARED 情况下,该字段才会使用 |
SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 |
同步状态 State
在了解数据结构后,接下来了解一下 AQS 的同步状态:State。AQS 中维护了一个名为 state 的字段,意为同步状态,是由 Volatile 修饰的,用于展示当前临界资源的获锁情况。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;
下面提供了几个访问这个字段的方法:
方法名 | 描述 |
---|---|
protected final int getState() | 获取 State 的值 |
protected final void setState(int newState) | 设置 State 的值 |
protected final boolean compareAndSetState(int expect, int update) | 使用 CAS 方式更新 State |
这几个方法都是 Final 修饰的,说明子类中无法重写它们。我们可以通过修改 State 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API 层。
同步器接口
从架构图中可以得知,AQS 提供了大量用于自定义同步器实现的 Protected 方法。自定义同步器实现的相关方法也只是为了通过修改 State 字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法:
- protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。
- protected boolean tryAcquire(int arg) 独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 True,失败则返回 False。
- protected boolean tryRelease(int arg) 独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 True,失败则返回 False。
- protected int tryAcquireShared(int arg) 共享方式。arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- protected boolean tryReleaseShared(int arg) 共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 True,否则返回 False。
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。
加锁:
- 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
- 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
- AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
- tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。
解锁:
- 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
- Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
- Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
- 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。
通过上面的描述,大概可以总结出 ReentrantLock 加锁解锁时 API 层核心方法的映射关系。
ReentrantLock 中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。在非公平锁中,有一段这样的代码:
// java.util.concurrent.locks.ReentrantLock
static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以 ReentrantLock 为例)。如果该方法返回了 True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。下面会详细解释线程是何时以及怎样被加入进等待队列中的。
AQS 应用
ReentrantLock 的可重入应用
ReentrantLock 的可重入性是 AQS 很好的应用之一,在了解完上述知识点以后,我们很容易得知 ReentrantLock 实现可重入的方法。在 ReentrantLock 里面,不管是公平锁还是非公平锁,都有一段逻辑。
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
if (c == 0) {
if (compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
从上面这两段都可以看到,有一个同步状态 State 来控制整体可重入的情况。State 是 Volatile 修饰的,用于保证一定的可见性和有序性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;
- State 初始化的时候为 0,表示没有任何线程持有锁。
- 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
- 解锁也是对这个字段-1,一直到 0,此线程对锁释放。
J.U.C 中的应用场景
除了上边 ReentrantLock 的可重入性的应用,AQS 作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了 JUC 中的几种同步工具,大体介绍一下 AQS 的应用场景:
同步工具 | 同步工具与 AQS 的关联 |
---|---|
ReentrantLock | 使用 AQS 保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock 记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
Semaphore | 使用 AQS 同步状态来保存信号量的当前计数。tryRelease 会增加计数,acquireShared 会减少计数。 |
CountDownLatch | 使用 AQS 同步状态来表示计数。计数为 0 时,所有的 Acquire 操作(CountDownLatch 的 await 方法)才可以通过。 |
ReentrantReadWriteLock | 使用 AQS 同步状态中的 16 位保存写锁持有的次数,剩下的 16 位用于保存读锁的持有次数。 |
ThreadPoolExecutor | Worker 利用 AQS 同步状态实现对独占线程变量的设置(tryAcquire 和 tryRelease)。 |
自定义同步工具
public class LeeLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
通过我们自己定义的 Lock 完成一定的同步功能。
public class LeeMain {
static int count = 0;
static LeeLock leeLock = new LeeLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
Links
- 2021-源码级深挖 AQS 队列同步器: 通过理解队列同步器的工作原理,对我们了解和使用这些工具类会有很大的帮助。