AQS简介

AQS的全称是AbstractQueuedSynchronizer,类内部定义了一套多线程访问共享资源的同步器框架,Java许多同步类的实现都依赖于它,比如常用的ReentrantLock、Semaphore、CountDownLatch等,我们也可以利用AQS自己实现一个锁。

AQS类内部的核心为volatile int state(共享资源)CLH线程等待队列(阻塞队列),整个AQS类内部大量的方法都是围绕state、CLH队列在处理逻辑。

state

state作为共享资源被应用在多线程竞争上,自带的volatile关键字可以保证可见性、有序性,在搭配CAS使用后可以保证操作的原子性。state初始状态为0,线程使用CAS对state+1成功后持有锁,后续每次重入state+1、退出state-1,state递减为0时代表锁释放。

CLH队列

当线程竞争失败后会被封装成Node节点加入CLH队列,CLH队列在AQS中是以前驱节点(head)、后驱节点(tail)俩个成员构成的Node类型链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

// 前驱节点
private transient volatile Node head;

// 后驱节点
private transient volatile Node tail;

// 静态内部类Node
static final class Node {

/** 共享锁 */
static final Node SHARED = new Node();

/** 独占锁 */
static final Node EXCLUSIVE = null;

/** 表示线程已被取消 */
static final int CANCELLED = 1;

/** 表示后续线程需要取消阻塞 */
static final int SIGNAL = -1;

/** 表示线程在条件下等待 */
static final int CONDITION = -2;

/** 表示下一个获取共享应无条件传播 */
static final int PROPAGATE = -3;

/**
* 节点等待状态
* 等于0:该节点尚未被初始化完成
* 大于0:说明该线程中断或者等待超时,需要移除该线程
* 小于0:该线程处于可以被唤醒的状态
*/
volatile int waitStatus;

/** 前驱节点 */
volatile Node prev;

/** 后继节点 */
volatile Node next;

/** 获取同步状态的线程 */
volatile Thread thread;

/** 将单向列表变成双向列表 */
Node nextWaiter;

// 是否为共享节点
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前继节点,没有就抛出异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

// 无参构造器
Node() {
}

// 构造器
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 构造器
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

Node内部类主要通过waitStatus来表示状态,主要有五种状态:

状态 状态值 描述
INITAL 0 初始状态
CANCELLED 1 此节点的后继节点(或即将)被阻塞,因此当前节点在释放或取消时必须取消对其后继节点的阻塞
SIGNAL -1 此节点的后继节点(或将很快)被阻塞(通过park),因此当前节点在释放或取消时必须取消对其后继节点的阻塞。为了避免争用,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞
CONDITION -2 节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中
PROPAGATE -3 与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态

链表入列
链表的入列采用CAS方式进行,前驱节点与后驱节都是被volatile修饰的,因此使用CAS修改可以保证绝对安全,在enq方法中AQS使用死循环保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private Node addWaiter(Node mode) {
// 新建Node
Node node = new Node(Thread.currentThread(), mode);
// CAS快速尝试添加尾节点(侥幸心理,万一成功了呢)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}

private Node enq(final Node node) {
//多次尝试,直到成功为止
for (;;) {
Node t = tail;
//tail不存在,设置为首节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

当线程被封装成Node节点成功追加到等待队列尾部后,为了节约CPU资源就需要将当前线程挂起了(被阻塞的线程如果支持可中断并且被中断,自动唤醒并抛出中断异常):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
final boolean acquireQueued(final Node node, int arg) {

// 获取资源是否失败标记
boolean failed = true;
try {

//标记等待过程中是否被中断过
boolean interrupted = false;

// 自旋
for (;;) {

//拿到前驱节点
final Node p = node.predecessor();

/*
* 如果前驱是head,说明自己排在第二位,有可能马上就被执行
* 所以再次尝试tryAcquire()获取,如果失败就挂起等待
* 当然有可能是第一位搞完了释放资源唤醒自己,也有可能被interrupt
*/
if (p == head && tryAcquire(arg)) {

// 获取到资源后,把自己设置为head,也就是说head指向的永远是当前拿到资源的
setHead(node);

// 断绝与前驱节点的联系,方便被GC回收
p.next = null;

// 成功获取资源后将失败标记为false
failed = false;

// 返回等待过程中是否被中断过
return interrupted;
}

/*
* 先去检查自己是否真的可以被挂起了,如果不符合条件会进入下一次循环直到符合为止
* 调用park()方法将自己挂起,直到被唤醒
* 唤醒后会返回是否被中断标记,方便下次return出去
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {

// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),取消结点在队列中的等待。
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire方法,检查自己是否真的可以被挂起了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

// 拿到前驱节点的状态
int ws = pred.waitStatus;

// 如果前驱节点的状态是SIGNAL,那么前驱节点执行完会自动唤醒自己,放心的将自身挂起就好了
if (ws == Node.SIGNAL)
return true;

// 如果前驱节点执行过程中放弃了(超时或者其他的),一直往前找,直到找到正常等待的状态节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt方法,就是挂起:

1
2
3
4
5
6
7
8
private final boolean parkAndCheckInterrupt() {

//调用park()使线程进入waiting状态
LockSupport.park(this);

//如果执行到这里,说明被唤醒,查看自己是不是被中断的。
return Thread.interrupted();
}

内部方法

主要方法

  • acquire(int arg):独占式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列
  • acquireInterruptibly(int arg):独占式获取同步状态(同上),如果被打断直接抛异常
  • tryAcquire(int arg):独占式获取同步状态(供开发者重写)
  • tryAcquireNanos(int arg,long nanosTimeout):独占式获取同步状态,增加超时限制
  • release(int arg):独占式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒
  • tryRelease(int arg):独占式释放同步状态(供开发者重写)
  • acquireShared(int arg):共享式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列
  • acquireSharedInterruptibly(int arg):共享式获取同步状态(同上),如果被打断直接抛异常
  • tryAcquireShared(int arg):共享式获取同步状态(供开发者重写)
  • tryAcquireSharedNanos(int arg,long nanosTimeout):共享式获取同步状态,增加超时限制
  • releaseShared(int arg):共享式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒
  • tryReleaseShared(int arg):共享式释放同步状态(供开发者重写)
  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占

方法虽然很多,不过很容易进行区分

  • 首先争夺锁的方式有独占和共享
  • 每种方式又包含加锁、释放锁方法
  • 加锁的方法又分为直接加锁、超时加锁、中断加锁
  • 直接加锁与中断加锁内部调用对应try开头的加锁方法处理
  • try开头的加锁方法采用模板模式,具体实现由开发者自己重写实现
  • 最后一个是否独占并占用的查询

共享资源获取释放
在需要开发者重写的获取资源方法中,独占式获取资源方法tryAcquire(int arg)返回值为boolean类型,仅仅需要告诉调用者获取成功还是失败即可

而共享式获取资源方法acquireShared(int arg)返回int类型,大于等于零表示成功,小于零则表示失败,因为是共享所以允许多个线程访问获取,但有些时候我们需要限制访问数量。这就可以设置一个阈值,每次有线程进来时阈值-1消耗,当消耗为零的时候,后续线程就不允许访问了,直接进入等待队列。

同样的,共享式资源的释放相比较独占式逻辑也有不同,除了唤醒后继节点,还需要将阈值+1

独占式源码解析

acquire(获取锁)

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

先尝试用重写的tryAcquire(arg)方法,由于独占锁同一时刻只允许一个线程持有,这就需要开发者在重写方法时要利用好state属性,确保拿到锁的线程返回true,在没有释放前其他线程访问返回false。如果返回false就将线程封装成一个独占式锁加入队列中,紧接着尝试挂起线程。

release(释放锁)

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

先尝试调用重写的tryRelease(int arg)释放锁,如果成功后判断自身状态,如果节点状态不等于0(也就是还没退出等待队列),调用unparkSuccessor方法释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void unparkSuccessor(Node node) {

// 获取当前节点的状态
int ws = node.waitStatus;

// 如果小于0,使用CAS设置为0,0代表退出等待队列
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 获取后继节点
Node s = node.next;

// 如果没有后继节点,或者后继节点状态大于0,也就是说已经退出队列了
if (s == null || s.waitStatus > 0) {

// 方便GC回收
s = null;

// 不停的往后面找,直到找到状态正常的为止
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}

// 如果找到了就唤醒
if (s != null)
LockSupport.unpark(s.thread);
}

这个方法的逻辑也很简单,使用CAS方式将自身节点状态设置为0,紧接着根据自身的waitStatus判断后继节点是否需要被唤醒,如果后继节点因为响应中断等情况放弃了,就继续往后找,直到找到可以背唤醒的节点线程。

acquireInterruptibly(获取锁并支持中断)

1
2
3
4
5
6
7
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

先判断是否中断状态,如果是直接抛异常。如果不是中断状态,进入doAcquireInterruptibly(arg)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

代码逻辑与acquire几乎一致,AQS阻塞等待逻辑的老套路就是,如果等待线程的前驱节点不是head则使用park()挂起,在parkAndCheckInterrupt()中实现,紧接着下一行返回中断状态。处于挂起状态的线程如果被中断,会立刻结束挂起状态,因此在上面的代码中会满足第二个if判断,抛出中断异常。这里有个疑问,如果前驱节点是head,中断没做任何处理?

doAcquireNanos(获取锁并支持中断、超时)
进入方法前获取当前时间戳,每次循环再次获取当前时间戳用差值判断是否超时,就算是被挂起的,也是调用park(this,nanosTimeout)进行挂起,到达超时时间直接跳出自旋。其他逻辑和doAcquireInterruptibly()一致。

共享式源码解析

acquireShared(获取锁)

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

先尝试用重写的acquireShared(arg)方法,由于共享锁同一时刻时允许多个线程进行访问的,AQS将重写方法设计为支持同一时刻最大访问限制数,返回值的int类型,表示如果当前线程进入访问后还能剩余多少访问数,如果为负数证明已经没有访问名额了,只能阻塞等待。

releaseShared(释放锁)

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

先尝试用重写的tryReleaseShared(arg)释放锁,加锁的时候是对state(访问限制数)-1,那么释放锁自然是加回来,这时有可能很多线程都在释放锁,因此在重写方法里加值要使用CAS方式。释放成功就代表有资源空闲出来,调用doReleaseShared方法唤醒后续节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

在自旋的阶段,每一次循环的过程都是首先获得头结点,如果头结点不为空且不为尾结点(阻塞队列里面只有一个结点),那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0,因为是要释放资源了,它也不需要做什么了,所以转变为初始状态,然后去唤醒后继结点unparkSuccessor(h),如果结点状态一开始就是0,那么就给他转换成PROPAGATE状态,保证在后续获取资源的时候,还能够向后面传播(这一块不明白)。

tryAcquireSharedNanos(获取锁并支持中断、超时)
进入方法前获取当前时间戳,每次循环再次获取当前时间戳用差值判断是否超时,就算是被挂起的,也是调用park(this,nanosTimeout)进行挂起,到达超时时间直接跳出自旋。其他逻辑和tryAcquireShared()一致。

简单应用

看懂AQS的原理机制后,我们可以尝试自己写一个不可重入锁,首先定义一下锁资源(AQS中的state)的含义,0表示未被加锁,1表示已经加锁。直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class CustomLock {

private Sync sync;

// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {

// 判断是否锁定状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 获取资源
@Override
protected boolean tryAcquire(int arg) {

// 使用CAS修改状态,如果成功设置当前资源为独占资源
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 释放资源
@Override
protected boolean tryRelease(int arg) {

//既然释放,肯定就是已占有状态了,为了代码健壮一点加层判断
if (getState() == 0)
throw new IllegalMonitorStateException();

// 清空独占记录
setExclusiveOwnerThread(null);

// 释放共享资源,tryRelease还没执行完,线程仍然持有锁,因此不需要CAS修改
setState(0);

return true;
}
}

// 在自定义加锁对象创建时,为其初始化一个同步器
public CustomLock(){
sync = new Sync();
}

// 加锁
public void lock() {
sync.acquire(1);
}

// 单次加锁尝试
public boolean tryLock() {
return sync.tryAcquire(1);
}

// 释放锁
public void unlock(){
sync.release(1);
}

// 锁是否处于加锁状态
public boolean isLocked(){
return sync.isHeldExclusively();
}
}

可重入锁在加锁、释放锁的时候需要对state进行加减操作,并且确保退出的时候state为零,再此期间其他线程访问时如果state大于等于零,则获取锁失败。由于这段代码设计的是不可重入锁,不需要记录次数,仅仅有加锁(1)和未加锁(0)俩中状态,因此lock()、tryLock()、unlock()方法传参随便写都可以,在内部类Sync重写AQS方法中已经写死。

利用AQS我们可以实现很多种同步机制,比如CountDownLatch、CyclicBarrier、Semaphore、Lock诸多实现类,都是利用AQS来实现。

评论