多线程(十一) 同步工具类-CountDownLatch
基本概念 CountDownLatch是一种线程同步工具类,它允许一个或多个线程等待直到在其他线程中一组操作执行完成。你可以把它理解为一个计数器,对象被创建的时候指定总数,每有一个线程到达指定条件总数减1,当减到为0时代表所有线程都达到条件,所有等待线程被唤醒继续往下执行,因此CountDownlatch也被称为倒计时锁 。
使用场景 例如运营系统的流量、业务等统计功能,页面需要统计展示每日的新增用户量、订单数量、商品销售总量、商品销售总额等。如果每个统计类型的查询需要2秒,4个统计类型就需要8秒的时间才能返回给前端,用户显然是无法接受的。我们只需要将4个统计类型的查询由串行执行改为并行执行,等待所有线程都查询完在组装返回,那么整个请求的响应时间就缩短到的了2秒。
写个简单的Demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 public class Test { public static void main(String[] args) { long startTimeMillis = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(4); Map<String, Long> statisticsMap = new Hashtable<>(); // 1.查询新增用户量 new Thread(new Runnable() { @Override public void run() { // 模拟两秒查询 try { Thread.sleep(2000L); statisticsMap.put("addUserCount", 1000L); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 倒计时锁-1 countDownLatch.countDown(); } } }).start(); // 2.查询订单数量 new Thread(new Runnable() { @Override public void run() { // 模拟两秒查询 try { Thread.sleep(2000L); statisticsMap.put("orderCount", 248300L); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 倒计时锁-1 countDownLatch.countDown(); } } }).start(); // 3.查询商品销售总量 new Thread(new Runnable() { @Override public void run() { // 模拟两秒查询 try { Thread.sleep(2000L); statisticsMap.put("commodityCount", 300L); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 倒计时锁-1 countDownLatch.countDown(); } } }).start(); // 4.查询商品销售总额 new Thread(new Runnable() { @Override public void run() { // 模拟两秒查询 try { Thread.sleep(2000L); statisticsMap.put("totalSales", 9073180L); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 倒计时锁-1 countDownLatch.countDown(); } } }).start(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long takeTimeMillis = System.currentTimeMillis() - startTimeMillis; System.out.println("耗时:" + takeTimeMillis + "ms"); System.out.println("返回值:" + statisticsMap); } }
耗时:2006ms 返回值:{commodityCount=300, totalSales=9073180, orderCount=248300, addUserCount=1000}
构造器源码 1 2 3 4 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
CountDownLatch底层基于AQS实现,当我们调用CountDownLatch countDownLatch= new CountDownLatch(4) 创建一个实例时,会在对象内部创建一个继承AQS的Sync类,并将构造器的参数值赋值给state,所以state的值也代表CountDownLatch所剩余的计数次数。
Sync源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 根据计数值是否耗尽(为0就算耗尽),返回正数(1)或者负数(-1) protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 共享式释放锁的逻辑重写,主要提供给countDown()使用 protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取当前state值 int c = getState(); // 如果state=0,说明计数值已经耗尽了,不需要继续释放 if (c == 0) return false; // 使用CAS方式-1 int nextc = c-1; // 如果减完为0,证明是最后一个释放的,返回true if (compareAndSetState(c, nextc)) return nextc == 0; } } }
Sync除了维护了state值以外,分别重写了tryAcquireShared()与tryReleaseShared()方法,主要提供给CountDownLatch的countDown()与await()方法调用。
countDown()源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 // 内部调用AQS的共享式释放锁 public void countDown() { sync.releaseShared(1); } // AQS的共享式释放锁 public final boolean releaseShared(int arg) { // if中的方法被CountDownLatch重写,仅当state不为0并且修改后为0时才返回true if (tryReleaseShared(arg)) { // 如果state修改后是0,说明自己是最后一个执行完毕的,需要唤醒所有等待的线程 doReleaseShared(); // countDown()方法并没有利用返回值做其他事情,可以无视 return true; } return false; }
countDown()方法的逻辑非常简单,就是利用静态内部类Sync的重写方法tryReleaseShared(),使用CAS方式对计数值(state)-1操作。如果返回true证明自身是最后一个执行完成的,还需要唤醒所有阻塞的等待线程。
await()源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // 内部调用AQS的共享式获取锁方式(支持中断) public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AQS共享式获取锁方式(支持中断) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果已经是中断状态,直接抛出来 if (Thread.interrupted()) throw new InterruptedException(); // 调用重写的共享式获取锁方法,如果返回值小于0证明计数值还没有耗尽,需要加入等待队列 if (tryAcquireShared(arg) < 0) // AQS的方法,前面已经解释过了,排队的第一个自旋等待,后面的挂起等待,直到tryAcquireShared()>=0 doAcquireSharedInterruptibly(arg); }
await()方法无非就是阻塞,第一个调用此方法的线程是自旋等待,直到计数值耗尽(state=0)跳出,如果有多个线程调用此方法等待,则使用park()函数挂起直到被唤醒。并且提供重载方法支持超时放弃,等待过程中支持中断响应。
await(timeout, unit)源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // 内部调用AQS的共享式获取锁方式(支持超时与中断) public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // AQS的共享式获取锁方式(支持超时与中断) public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { // 先做中断校验 if (Thread.interrupted()) throw new InterruptedException(); // 如果tryAcquireShared()方法返回值大于0,也就是已经计数值已耗尽(state=0) 直接返回就好了 // 如果没有耗尽,进入阻塞方法,也是AQS源码 不解释了... return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
在await()基础上增加超时功能,防止意外情况导致条件永远无法满足,等待线程一直阻塞。
总结 CountDownLatch的作用是牺牲运行内存(额外创建的线程需要额外的栈空间支出)以及CPU资源(请求过程中会有额外的线程加入CPU使用权争夺)来提高请求的响应效率。因此CountDownLatch不能盲目使用,要参考JVM大小、CPU核数等配置信息,还要估算接口的QPS,避免大量请求导致JVM栈溢出或CPU使用率到100%。
在创建CountDownLatch时,构造器参数值一定要和处理任务的子线程数相等,避免高于子线程数量造成死锁,或者低于子线程数造成部门数据丢失。子线程的countDown()方法最好放在finally代码块中,避免执行过程中出现异常导致没有被执行。为了保险起见,主线程最好使用支持超时的await()进行等待,彻底解决可能出现的死锁情况。