AQS简介 AQS的全称是AbstractQueuedSynchronizer,类内部定义了一套多线程访问共享资源的同步器框架,Java许多同步类的实现都依赖于它,比如常用的ReentrantLock、Semaphore、CountDownLatch等,我们也可以利用AQS自己实现一个锁。
AQS类内部的核心为volatile int state(共享资源) 和CLH线程等待队列(阻塞队列) ,整个AQS类内部大量的方法都是围绕state、CLH队列在处理逻辑。
state state作为共享资源被应用在多线程竞争上,自带的volatile关键字可以保证可见性、有序性,在搭配CAS使用后可以保证操作的原子性。state初始状态为0,线程使用CAS对state+1成功后持有锁,后续每次重入state+1、退出state-1,state递减为0时代表锁释放。
CLH队列 当线程竞争失败后会被封装成Node节点加入CLH队列,CLH队列在AQS中是以前驱节点(head)、后驱节点(tail)俩个成员构成的Node类型链表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 // 前驱节点 private transient volatile Node head; // 后驱节点 private transient volatile Node tail; // 静态内部类Node static final class Node { /** 共享锁 */ static final Node SHARED = new Node(); /** 独占锁 */ static final Node EXCLUSIVE = null; /** 表示线程已被取消 */ static final int CANCELLED = 1; /** 表示后续线程需要取消阻塞 */ static final int SIGNAL = -1; /** 表示线程在条件下等待 */ static final int CONDITION = -2; /** 表示下一个获取共享应无条件传播 */ static final int PROPAGATE = -3; /** * 节点等待状态 * 等于0:该节点尚未被初始化完成 * 大于0:说明该线程中断或者等待超时,需要移除该线程 * 小于0:该线程处于可以被唤醒的状态 */ volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 获取同步状态的线程 */ volatile Thread thread; /** 将单向列表变成双向列表 */ Node nextWaiter; // 是否为共享节点 final boolean isShared() { return nextWaiter == SHARED; } // 获取前继节点,没有就抛出异常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 无参构造器 Node() { } // 构造器 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 构造器 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
Node内部类主要通过waitStatus来表示状态,主要有五种状态 :
状态
状态值
描述
INITAL
0
初始状态
CANCELLED
1
此节点的后继节点(或即将)被阻塞,因此当前节点在释放或取消时必须取消对其后继节点的阻塞
SIGNAL
-1
此节点的后继节点(或将很快)被阻塞(通过park),因此当前节点在释放或取消时必须取消对其后继节点的阻塞。为了避免争用,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞
CONDITION
-2
节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中
PROPAGATE
-3
与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
链表入列 链表的入列采用CAS方式进行,前驱节点与后驱节都是被volatile修饰的,因此使用CAS修改可以保证绝对安全,在enq方法中AQS使用死循环保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private Node addWaiter(Node mode) { // 新建Node Node node = new Node(Thread.currentThread(), mode); // CAS快速尝试添加尾节点(侥幸心理,万一成功了呢) Node pred = tail; if (pred != null) { node.prev = pred; //CAS设置尾节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //多次尝试 enq(node); return node; } private Node enq(final Node node) { //多次尝试,直到成功为止 for (;;) { Node t = tail; //tail不存在,设置为首节点 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //设置为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
当线程被封装成Node节点成功追加到等待队列尾部后,为了节约CPU资源就需要将当前线程挂起了(被阻塞的线程如果支持可中断并且被中断,自动唤醒并抛出中断异常):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 final boolean acquireQueued(final Node node, int arg) { // 获取资源是否失败标记 boolean failed = true; try { //标记等待过程中是否被中断过 boolean interrupted = false; // 自旋 for (;;) { //拿到前驱节点 final Node p = node.predecessor(); /* * 如果前驱是head,说明自己排在第二位,有可能马上就被执行 * 所以再次尝试tryAcquire()获取,如果失败就挂起等待 * 当然有可能是第一位搞完了释放资源唤醒自己,也有可能被interrupt */ if (p == head && tryAcquire(arg)) { // 获取到资源后,把自己设置为head,也就是说head指向的永远是当前拿到资源的 setHead(node); // 断绝与前驱节点的联系,方便被GC回收 p.next = null; // 成功获取资源后将失败标记为false failed = false; // 返回等待过程中是否被中断过 return interrupted; } /* * 先去检查自己是否真的可以被挂起了,如果不符合条件会进入下一次循环直到符合为止 * 调用park()方法将自己挂起,直到被唤醒 * 唤醒后会返回是否被中断标记,方便下次return出去 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),取消结点在队列中的等待。 if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire方法,检查自己是否真的可以被挂起了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 拿到前驱节点的状态 int ws = pred.waitStatus; // 如果前驱节点的状态是SIGNAL,那么前驱节点执行完会自动唤醒自己,放心的将自身挂起就好了 if (ws == Node.SIGNAL) return true; // 如果前驱节点执行过程中放弃了(超时或者其他的),一直往前找,直到找到正常等待的状态节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt方法,就是挂起:
1 2 3 4 5 6 7 8 private final boolean parkAndCheckInterrupt() { //调用park()使线程进入waiting状态 LockSupport.park(this); //如果执行到这里,说明被唤醒,查看自己是不是被中断的。 return Thread.interrupted(); }
内部方法 主要方法
acquire(int arg):独占式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列
acquireInterruptibly(int arg):独占式获取同步状态(同上),如果被打断直接抛异常
tryAcquire(int arg):独占式获取同步状态(供开发者重写)
tryAcquireNanos(int arg,long nanosTimeout):独占式获取同步状态,增加超时限制
release(int arg):独占式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒
tryRelease(int arg):独占式释放同步状态(供开发者重写)
acquireShared(int arg):共享式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列
acquireSharedInterruptibly(int arg):共享式获取同步状态(同上),如果被打断直接抛异常
tryAcquireShared(int arg):共享式获取同步状态(供开发者重写)
tryAcquireSharedNanos(int arg,long nanosTimeout):共享式获取同步状态,增加超时限制
releaseShared(int arg):共享式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒
tryReleaseShared(int arg):共享式释放同步状态(供开发者重写)
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占
方法虽然很多,不过很容易进行区分
首先争夺锁的方式有独占和共享
每种方式又包含加锁、释放锁方法
加锁的方法又分为直接加锁、超时加锁、中断加锁
直接加锁与中断加锁内部调用对应try开头的加锁方法处理
try开头的加锁方法采用模板模式,具体实现由开发者自己重写实现
最后一个是否独占并占用的查询
共享资源获取释放 在需要开发者重写的获取资源方法中,独占式获取资源方法tryAcquire(int arg)返回值为boolean类型,仅仅需要告诉调用者获取成功还是失败即可 。
而共享式获取资源方法acquireShared(int arg)返回int类型,大于等于零表示成功,小于零则表示失败,因为是共享所以允许多个线程访问获取,但有些时候我们需要限制访问数量 。这就可以设置一个阈值,每次有线程进来时阈值-1消耗,当消耗为零的时候,后续线程就不允许访问了,直接进入等待队列。
同样的,共享式资源的释放相比较独占式逻辑也有不同,除了唤醒后继节点,还需要将阈值+1 。
独占式源码解析 acquire(获取锁)
1 2 3 4 5 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先尝试用重写的tryAcquire(arg)方法,由于独占锁同一时刻只允许一个线程持有,这就需要开发者在重写方法时要利用好state属性,确保拿到锁的线程返回true,在没有释放前其他线程访问返回false。如果返回false就将线程封装成一个独占式锁加入队列中,紧接着尝试挂起线程。
release(释放锁)
1 2 3 4 5 6 7 8 9 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
先尝试调用重写的tryRelease(int arg)释放锁,如果成功后判断自身状态,如果节点状态不等于0(也就是还没退出等待队列),调用unparkSuccessor方法释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void unparkSuccessor(Node node) { // 获取当前节点的状态 int ws = node.waitStatus; // 如果小于0,使用CAS设置为0,0代表退出等待队列 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 获取后继节点 Node s = node.next; // 如果没有后继节点,或者后继节点状态大于0,也就是说已经退出队列了 if (s == null || s.waitStatus > 0) { // 方便GC回收 s = null; // 不停的往后面找,直到找到状态正常的为止 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果找到了就唤醒 if (s != null) LockSupport.unpark(s.thread); }
这个方法的逻辑也很简单,使用CAS方式将自身节点状态设置为0,紧接着根据自身的waitStatus判断后继节点是否需要被唤醒,如果后继节点因为响应中断等情况放弃了,就继续往后找,直到找到可以背唤醒的节点线程。
acquireInterruptibly(获取锁并支持中断)
1 2 3 4 5 6 7 public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
先判断是否中断状态,如果是直接抛异常。如果不是中断状态,进入doAcquireInterruptibly(arg)方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
代码逻辑与acquire几乎一致,AQS阻塞等待逻辑的老套路就是,如果等待线程的前驱节点不是head则使用park()挂起,在parkAndCheckInterrupt()中实现,紧接着下一行返回中断状态。处于挂起状态的线程如果被中断,会立刻结束挂起状态,因此在上面的代码中会满足第二个if判断,抛出中断异常。这里有个疑问,如果前驱节点是head,中断没做任何处理?
doAcquireNanos(获取锁并支持中断、超时) 进入方法前获取当前时间戳,每次循环再次获取当前时间戳用差值判断是否超时,就算是被挂起的,也是调用park(this,nanosTimeout)进行挂起,到达超时时间直接跳出自旋。其他逻辑和doAcquireInterruptibly()一致。
共享式源码解析 acquireShared(获取锁)
1 2 3 4 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
先尝试用重写的acquireShared(arg)方法,由于共享锁同一时刻时允许多个线程进行访问的,AQS将重写方法设计为支持同一时刻最大访问限制数,返回值的int类型,表示如果当前线程进入访问后还能剩余多少访问数,如果为负数证明已经没有访问名额了,只能阻塞等待。
releaseShared(释放锁)
1 2 3 4 5 6 7 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
先尝试用重写的tryReleaseShared(arg)释放锁,加锁的时候是对state(访问限制数)-1,那么释放锁自然是加回来,这时有可能很多线程都在释放锁,因此在重写方法里加值要使用CAS方式。释放成功就代表有资源空闲出来,调用doReleaseShared方法唤醒后续节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
在自旋的阶段,每一次循环的过程都是首先获得头结点,如果头结点不为空且不为尾结点(阻塞队列里面只有一个结点),那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0,因为是要释放资源了,它也不需要做什么了,所以转变为初始状态,然后去唤醒后继结点unparkSuccessor(h),如果结点状态一开始就是0,那么就给他转换成PROPAGATE状态,保证在后续获取资源的时候,还能够向后面传播(这一块不明白)。
tryAcquireSharedNanos(获取锁并支持中断、超时) 进入方法前获取当前时间戳,每次循环再次获取当前时间戳用差值判断是否超时,就算是被挂起的,也是调用park(this,nanosTimeout)进行挂起,到达超时时间直接跳出自旋。其他逻辑和tryAcquireShared()一致。
简单应用 看懂AQS的原理机制后,我们可以尝试自己写一个不可重入锁,首先定义一下锁资源(AQS中的state)的含义,0表示未被加锁,1表示已经加锁。直接上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class CustomLock { private Sync sync; // 自定义同步器 private static class Sync extends AbstractQueuedSynchronizer { // 判断是否锁定状态 @Override protected boolean isHeldExclusively() { return getState() == 1; } // 获取资源 @Override protected boolean tryAcquire(int arg) { // 使用CAS修改状态,如果成功设置当前资源为独占资源 if(compareAndSetState(0, 1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放资源 @Override protected boolean tryRelease(int arg) { //既然释放,肯定就是已占有状态了,为了代码健壮一点加层判断 if (getState() == 0) throw new IllegalMonitorStateException(); // 清空独占记录 setExclusiveOwnerThread(null); // 释放共享资源,tryRelease还没执行完,线程仍然持有锁,因此不需要CAS修改 setState(0); return true; } } // 在自定义加锁对象创建时,为其初始化一个同步器 public CustomLock(){ sync = new Sync(); } // 加锁 public void lock() { sync.acquire(1); } // 单次加锁尝试 public boolean tryLock() { return sync.tryAcquire(1); } // 释放锁 public void unlock(){ sync.release(1); } // 锁是否处于加锁状态 public boolean isLocked(){ return sync.isHeldExclusively(); } }
可重入锁在加锁、释放锁的时候需要对state进行加减操作,并且确保退出的时候state为零,再此期间其他线程访问时如果state大于等于零,则获取锁失败。由于这段代码设计的是不可重入锁,不需要记录次数,仅仅有加锁(1)和未加锁(0)俩中状态,因此lock()、tryLock()、unlock()方法传参随便写都可以,在内部类Sync重写AQS方法中已经写死。
利用AQS我们可以实现很多种同步机制,比如CountDownLatch、CyclicBarrier、Semaphore、Lock诸多实现类,都是利用AQS来实现。