AQS简介 AQS的全称是AbstractQueuedSynchronizer,类内部定义了一套多线程访问共享资源的同步器框架,Java许多同步类的实现都依赖于它,比如常用的ReentrantLock、Semaphore、CountDownLatch等,我们也可以利用AQS自己实现一个锁。
AQS类内部的核心为volatile int state(共享资源) 和CLH线程等待队列(阻塞队列) ,整个AQS类内部大量的方法都是围绕state、CLH队列在处理逻辑。
state state作为共享资源被应用在多线程竞争上,自带的volatile关键字可以保证可见性、有序性,在搭配CAS使用后可以保证操作的原子性。state初始状态为0,线程使用CAS对state+1成功后持有锁,后续每次重入state+1、退出state-1,state递减为0时代表锁释放。
CLH队列 当线程竞争失败后会被封装成Node节点加入CLH队列,CLH队列在AQS中是以前驱节点(head)、后驱节点(tail)俩个成员构成的Node类型链表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 // 前驱节点 private transient volatile Node head; // 后驱节点 private transient volatile Node tail; // 静态内部类Node static final class Node { 	/** 共享锁 */ 	static final Node SHARED = new Node(); 	/** 独占锁 */  	static final Node EXCLUSIVE = null; 	/** 表示线程已被取消 */ 	static final int CANCELLED = 1; 	/** 表示后续线程需要取消阻塞 */ 	static final int SIGNAL = -1; 	/** 表示线程在条件下等待 */ 	static final int CONDITION = -2; 	/** 表示下一个获取共享应无条件传播 */ 	static final int PROPAGATE = -3; 	/**  	 * 节点等待状态 	 * 等于0:该节点尚未被初始化完成 	 * 大于0:说明该线程中断或者等待超时,需要移除该线程 	 * 小于0:该线程处于可以被唤醒的状态 	 */ 	volatile int waitStatus;        /** 前驱节点 */ 	volatile Node prev;        /** 后继节点 */ 	volatile Node next;        /** 获取同步状态的线程 */ 	volatile Thread thread;        /** 将单向列表变成双向列表 */ 	Node nextWaiter;        // 是否为共享节点 	final boolean isShared() { 		return nextWaiter == SHARED; 	} 	// 获取前继节点,没有就抛出异常 	final Node predecessor() throws NullPointerException { 		Node p = prev; 		if (p == null) 			throw new NullPointerException(); 		else 			return p; 	} 	// 无参构造器 	Node() { 	}        // 构造器 	Node(Thread thread, Node mode) {     // Used by addWaiter 		this.nextWaiter = mode; 		this.thread = thread; 	}        // 构造器 	Node(Thread thread, int waitStatus) { // Used by Condition 		this.waitStatus = waitStatus; 		this.thread = thread; 	} } 
Node内部类主要通过waitStatus来表示状态,主要有五种状态 :
状态 
状态值 
描述 
 
 
INITAL 
0 
初始状态 
 
CANCELLED 
1 
此节点的后继节点(或即将)被阻塞,因此当前节点在释放或取消时必须取消对其后继节点的阻塞 
 
SIGNAL 
-1 
此节点的后继节点(或将很快)被阻塞(通过park),因此当前节点在释放或取消时必须取消对其后继节点的阻塞。为了避免争用,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞 
 
CONDITION 
-2 
节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中 
 
PROPAGATE 
-3 
与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态 
 
链表入列 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private Node addWaiter(Node mode) { 	// 新建Node 	Node node = new Node(Thread.currentThread(), mode); 	// CAS快速尝试添加尾节点(侥幸心理,万一成功了呢) 	Node pred = tail; 	if (pred != null) { 			node.prev = pred; 		//CAS设置尾节点 		if (compareAndSetTail(pred, node)) { 			pred.next = node; 			return node; 		} 	} 	//多次尝试 	enq(node); 	return node; }  private Node enq(final Node node) {        //多次尝试,直到成功为止        for (;;) {            Node t = tail;            //tail不存在,设置为首节点            if (t == null) {                if (compareAndSetHead(new Node()))                    tail = head;            } else {                //设置为尾节点                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }  
当线程被封装成Node节点成功追加到等待队列尾部后,为了节约CPU资源就需要将当前线程挂起了(被阻塞的线程如果支持可中断并且被中断,自动唤醒并抛出中断异常):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 final boolean acquireQueued(final Node node, int arg) { 	// 获取资源是否失败标记 	boolean failed = true; 	try { 		//标记等待过程中是否被中断过 		boolean interrupted = false; 		// 自旋 		for (;;) { 			//拿到前驱节点 			final Node p = node.predecessor(); 			/*  			 * 如果前驱是head,说明自己排在第二位,有可能马上就被执行            	 * 所以再次尝试tryAcquire()获取,如果失败就挂起等待            	 * 当然有可能是第一位搞完了释放资源唤醒自己,也有可能被interrupt            	 */ 			if (p == head && tryAcquire(arg)) { 				// 获取到资源后,把自己设置为head,也就是说head指向的永远是当前拿到资源的 				setHead(node); 				// 断绝与前驱节点的联系,方便被GC回收 				p.next = null; 				// 成功获取资源后将失败标记为false 				failed = false; 				// 返回等待过程中是否被中断过 				return interrupted; 			} 			/* 			 * 先去检查自己是否真的可以被挂起了,如果不符合条件会进入下一次循环直到符合为止 			 * 调用park()方法将自己挂起,直到被唤醒 			 * 唤醒后会返回是否被中断标记,方便下次return出去 			 */ 			if (shouldParkAfterFailedAcquire(p, node) && 				parkAndCheckInterrupt()) 				interrupted = true; 		} 	} finally { 		// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),取消结点在队列中的等待。 		if (failed) 			cancelAcquire(node); 	} } 
shouldParkAfterFailedAcquire方法,检查自己是否真的可以被挂起了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 	// 拿到前驱节点的状态 	int ws = pred.waitStatus; 	// 如果前驱节点的状态是SIGNAL,那么前驱节点执行完会自动唤醒自己,放心的将自身挂起就好了        if (ws == Node.SIGNAL)            return true;        // 如果前驱节点执行过程中放弃了(超时或者其他的),一直往前找,直到找到正常等待的状态节点        if (ws > 0) {            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            // 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    } 
parkAndCheckInterrupt方法,就是挂起:
1 2 3 4 5 6 7 8 private final boolean parkAndCheckInterrupt() { 	//调用park()使线程进入waiting状态 	LockSupport.park(this); 	//如果执行到这里,说明被唤醒,查看自己是不是被中断的。 	return Thread.interrupted(); } 
内部方法 主要方法 
acquire(int arg):独占式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列 
acquireInterruptibly(int arg):独占式获取同步状态(同上),如果被打断直接抛异常 
tryAcquire(int arg):独占式获取同步状态(供开发者重写) 
tryAcquireNanos(int arg,long nanosTimeout):独占式获取同步状态,增加超时限制 
release(int arg):独占式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒 
tryRelease(int arg):独占式释放同步状态(供开发者重写) 
 
acquireShared(int arg):共享式获取同步状态,如果当前线程获取成功则返回,否则加入等待队列 
acquireSharedInterruptibly(int arg):共享式获取同步状态(同上),如果被打断直接抛异常 
tryAcquireShared(int arg):共享式获取同步状态(供开发者重写) 
tryAcquireSharedNanos(int arg,long nanosTimeout):共享式获取同步状态,增加超时限制 
releaseShared(int arg):共享式释放同步状态,释放后将同步队列中第一个节点包含的线程唤醒 
tryReleaseShared(int arg):共享式释放同步状态(供开发者重写) 
 
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占 
 
方法虽然很多,不过很容易进行区分 
首先争夺锁的方式有独占和共享 
每种方式又包含加锁、释放锁方法 
加锁的方法又分为直接加锁、超时加锁、中断加锁 
直接加锁与中断加锁内部调用对应try开头的加锁方法处理 
try开头的加锁方法采用模板模式,具体实现由开发者自己重写实现 
最后一个是否独占并占用的查询 
 
共享资源获取释放 获取成功还是失败即可 。
而共享式获取资源方法acquireShared(int arg)返回int类型,大于等于零表示成功,小于零则表示失败,因为是共享所以允许多个线程访问获取,但有些时候我们需要限制访问数量 。这就可以设置一个阈值,每次有线程进来时阈值-1消耗,当消耗为零的时候,后续线程就不允许访问了,直接进入等待队列。
同样的,共享式资源的释放相比较独占式逻辑也有不同,除了唤醒后继节点,还需要将阈值+1 。
独占式源码解析 acquire(获取锁) 
1 2 3 4 5 public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    } 
先尝试用重写的tryAcquire(arg)方法,由于独占锁同一时刻只允许一个线程持有,这就需要开发者在重写方法时要利用好state属性,确保拿到锁的线程返回true,在没有释放前其他线程访问返回false。如果返回false就将线程封装成一个独占式锁加入队列中,紧接着尝试挂起线程。
release(释放锁) 
1 2 3 4 5 6 7 8 9 public final boolean release(int arg) { 	if (tryRelease(arg)) { 		Node h = head; 		if (h != null && h.waitStatus != 0) 			unparkSuccessor(h); 		return true; 	} 	return false; } 
先尝试调用重写的tryRelease(int arg)释放锁,如果成功后判断自身状态,如果节点状态不等于0(也就是还没退出等待队列),调用unparkSuccessor方法释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void unparkSuccessor(Node node) { 	// 获取当前节点的状态        int ws = node.waitStatus;        // 如果小于0,使用CAS设置为0,0代表退出等待队列        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        // 获取后继节点        Node s = node.next;        // 如果没有后继节点,或者后继节点状态大于0,也就是说已经退出队列了        if (s == null || s.waitStatus > 0) {        	// 方便GC回收            s = null;            // 不停的往后面找,直到找到状态正常的为止            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        // 如果找到了就唤醒        if (s != null)            LockSupport.unpark(s.thread);    } 
这个方法的逻辑也很简单,使用CAS方式将自身节点状态设置为0,紧接着根据自身的waitStatus判断后继节点是否需要被唤醒,如果后继节点因为响应中断等情况放弃了,就继续往后找,直到找到可以背唤醒的节点线程。
acquireInterruptibly(获取锁并支持中断) 
1 2 3 4 5 6 7 public final void acquireInterruptibly(int arg)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (!tryAcquire(arg))            doAcquireInterruptibly(arg);    } 
先判断是否中断状态,如果是直接抛异常。如果不是中断状态,进入doAcquireInterruptibly(arg)方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void doAcquireInterruptibly(int arg)        throws InterruptedException {        final Node node = addWaiter(Node.EXCLUSIVE);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return;                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }    private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    } 
代码逻辑与acquire几乎一致,AQS阻塞等待逻辑的老套路就是,如果等待线程的前驱节点不是head则使用park()挂起,在parkAndCheckInterrupt()中实现,紧接着下一行返回中断状态。处于挂起状态的线程如果被中断,会立刻结束挂起状态,因此在上面的代码中会满足第二个if判断,抛出中断异常。这里有个疑问,如果前驱节点是head,中断没做任何处理?
doAcquireNanos(获取锁并支持中断、超时) 
共享式源码解析 acquireShared(获取锁) 
1 2 3 4 public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    } 
先尝试用重写的acquireShared(arg)方法,由于共享锁同一时刻时允许多个线程进行访问的,AQS将重写方法设计为支持同一时刻最大访问限制数,返回值的int类型,表示如果当前线程进入访问后还能剩余多少访问数,如果为负数证明已经没有访问名额了,只能阻塞等待。
releaseShared(释放锁) 
1 2 3 4 5 6 7 public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    } 
先尝试用重写的tryReleaseShared(arg)释放锁,加锁的时候是对state(访问限制数)-1,那么释放锁自然是加回来,这时有可能很多线程都在释放锁,因此在重写方法里加值要使用CAS方式。释放成功就代表有资源空闲出来,调用doReleaseShared方法唤醒后续节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared() { 	for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        } } 
在自旋的阶段,每一次循环的过程都是首先获得头结点,如果头结点不为空且不为尾结点(阻塞队列里面只有一个结点),那么先获得该节点的状态,如果是SIGNAL的状态,则代表它需要有后继结点去唤醒,首先将其的状态变为0,因为是要释放资源了,它也不需要做什么了,所以转变为初始状态,然后去唤醒后继结点unparkSuccessor(h),如果结点状态一开始就是0,那么就给他转换成PROPAGATE状态,保证在后续获取资源的时候,还能够向后面传播(这一块不明白)。
tryAcquireSharedNanos(获取锁并支持中断、超时) 
简单应用 看懂AQS的原理机制后,我们可以尝试自己写一个不可重入锁,首先定义一下锁资源(AQS中的state)的含义,0表示未被加锁,1表示已经加锁。直接上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class CustomLock { 	private Sync sync; 	// 自定义同步器 	private static class Sync extends AbstractQueuedSynchronizer { 		// 判断是否锁定状态 		@Override 		protected boolean isHeldExclusively() { 			return getState() == 1; 		} 		// 获取资源 		@Override 		protected boolean tryAcquire(int arg) { 			// 使用CAS修改状态,如果成功设置当前资源为独占资源 			if(compareAndSetState(0, 1)){ 				setExclusiveOwnerThread(Thread.currentThread()); 				return true; 			} 			return false; 		} 		// 释放资源 		@Override 		protected boolean tryRelease(int arg) { 		//既然释放,肯定就是已占有状态了,为了代码健壮一点加层判断 		if (getState() == 0) 			throw new IllegalMonitorStateException(); 		// 清空独占记录 		setExclusiveOwnerThread(null); 		// 释放共享资源,tryRelease还没执行完,线程仍然持有锁,因此不需要CAS修改 		setState(0); 		return true; 		} 	} 	// 在自定义加锁对象创建时,为其初始化一个同步器 	public CustomLock(){ 		sync = new Sync(); 	} 	// 加锁 	public void lock() { 		sync.acquire(1); 	} 	// 单次加锁尝试 	public boolean tryLock() { 		return sync.tryAcquire(1); 	} 	// 释放锁 	public void unlock(){ 		sync.release(1); 	} 	// 锁是否处于加锁状态 	public boolean isLocked(){ 		return sync.isHeldExclusively(); 	} } 
可重入锁在加锁、释放锁的时候需要对state进行加减操作,并且确保退出的时候state为零,再此期间其他线程访问时如果state大于等于零,则获取锁失败。由于这段代码设计的是不可重入锁,不需要记录次数,仅仅有加锁(1)和未加锁(0)俩中状态,因此lock()、tryLock()、unlock()方法传参随便写都可以,在内部类Sync重写AQS方法中已经写死。
利用AQS我们可以实现很多种同步机制,比如CountDownLatch、CyclicBarrier、Semaphore、Lock诸多实现类,都是利用AQS来实现。