基本概念

CountDownLatch是一种线程同步工具类,它允许一个或多个线程等待直到在其他线程中一组操作执行完成。你可以把它理解为一个计数器,对象被创建的时候指定总数,每有一个线程到达指定条件总数减1,当减到为0时代表所有线程都达到条件,所有等待线程被唤醒继续往下执行,因此CountDownlatch也被称为倒计时锁

使用场景

例如运营系统的流量、业务等统计功能,页面需要统计展示每日的新增用户量、订单数量、商品销售总量、商品销售总额等。如果每个统计类型的查询需要2秒,4个统计类型就需要8秒的时间才能返回给前端,用户显然是无法接受的。我们只需要将4个统计类型的查询由串行执行改为并行执行,等待所有线程都查询完在组装返回,那么整个请求的响应时间就缩短到的了2秒。

写个简单的Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class Test {

public static void main(String[] args) {

long startTimeMillis = System.currentTimeMillis();

CountDownLatch countDownLatch = new CountDownLatch(4);

Map<String, Long> statisticsMap = new Hashtable<>();

// 1.查询新增用户量
new Thread(new Runnable() {
@Override
public void run() {

// 模拟两秒查询
try {
Thread.sleep(2000L);
statisticsMap.put("addUserCount", 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 倒计时锁-1
countDownLatch.countDown();
}
}
}).start();

// 2.查询订单数量
new Thread(new Runnable() {
@Override
public void run() {

// 模拟两秒查询
try {
Thread.sleep(2000L);
statisticsMap.put("orderCount", 248300L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 倒计时锁-1
countDownLatch.countDown();
}
}
}).start();


// 3.查询商品销售总量
new Thread(new Runnable() {
@Override
public void run() {

// 模拟两秒查询
try {
Thread.sleep(2000L);
statisticsMap.put("commodityCount", 300L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 倒计时锁-1
countDownLatch.countDown();
}
}
}).start();


// 4.查询商品销售总额
new Thread(new Runnable() {
@Override
public void run() {

// 模拟两秒查询
try {
Thread.sleep(2000L);
statisticsMap.put("totalSales", 9073180L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 倒计时锁-1
countDownLatch.countDown();
}
}
}).start();

try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

long takeTimeMillis = System.currentTimeMillis() - startTimeMillis;
System.out.println("耗时:" + takeTimeMillis + "ms");
System.out.println("返回值:" + statisticsMap);
}

}

耗时:2006ms
返回值:{commodityCount=300, totalSales=9073180, orderCount=248300, addUserCount=1000}

构造器源码

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

CountDownLatch底层基于AQS实现,当我们调用CountDownLatch countDownLatch= new CountDownLatch(4) 创建一个实例时,会在对象内部创建一个继承AQS的Sync类,并将构造器的参数值赋值给state,所以state的值也代表CountDownLatch所剩余的计数次数。

Sync源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

// 根据计数值是否耗尽(为0就算耗尽),返回正数(1)或者负数(-1)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 共享式释放锁的逻辑重写,主要提供给countDown()使用
protected boolean tryReleaseShared(int releases) {

// 自旋
for (;;) {

// 获取当前state值
int c = getState();

// 如果state=0,说明计数值已经耗尽了,不需要继续释放
if (c == 0)
return false;

// 使用CAS方式-1
int nextc = c-1;

// 如果减完为0,证明是最后一个释放的,返回true
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

Sync除了维护了state值以外,分别重写了tryAcquireShared()与tryReleaseShared()方法,主要提供给CountDownLatch的countDown()与await()方法调用。

countDown()源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

// 内部调用AQS的共享式释放锁
public void countDown() {
sync.releaseShared(1);
}

// AQS的共享式释放锁
public final boolean releaseShared(int arg) {

// if中的方法被CountDownLatch重写,仅当state不为0并且修改后为0时才返回true
if (tryReleaseShared(arg)) {

// 如果state修改后是0,说明自己是最后一个执行完毕的,需要唤醒所有等待的线程
doReleaseShared();

// countDown()方法并没有利用返回值做其他事情,可以无视
return true;
}
return false;
}

countDown()方法的逻辑非常简单,就是利用静态内部类Sync的重写方法tryReleaseShared(),使用CAS方式对计数值(state)-1操作。如果返回true证明自身是最后一个执行完成的,还需要唤醒所有阻塞的等待线程。

await()源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// 内部调用AQS的共享式获取锁方式(支持中断)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// AQS共享式获取锁方式(支持中断)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {

// 如果已经是中断状态,直接抛出来
if (Thread.interrupted())
throw new InterruptedException();

// 调用重写的共享式获取锁方法,如果返回值小于0证明计数值还没有耗尽,需要加入等待队列
if (tryAcquireShared(arg) < 0)
// AQS的方法,前面已经解释过了,排队的第一个自旋等待,后面的挂起等待,直到tryAcquireShared()>=0
doAcquireSharedInterruptibly(arg);
}

await()方法无非就是阻塞,第一个调用此方法的线程是自旋等待,直到计数值耗尽(state=0)跳出,如果有多个线程调用此方法等待,则使用park()函数挂起直到被唤醒。并且提供重载方法支持超时放弃,等待过程中支持中断响应。

await(timeout, unit)源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// 内部调用AQS的共享式获取锁方式(支持超时与中断)
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// AQS的共享式获取锁方式(支持超时与中断)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 先做中断校验
if (Thread.interrupted())
throw new InterruptedException();

// 如果tryAcquireShared()方法返回值大于0,也就是已经计数值已耗尽(state=0) 直接返回就好了
// 如果没有耗尽,进入阻塞方法,也是AQS源码 不解释了...
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

在await()基础上增加超时功能,防止意外情况导致条件永远无法满足,等待线程一直阻塞。

总结

CountDownLatch的作用是牺牲运行内存(额外创建的线程需要额外的栈空间支出)以及CPU资源(请求过程中会有额外的线程加入CPU使用权争夺)来提高请求的响应效率。因此CountDownLatch不能盲目使用,要参考JVM大小、CPU核数等配置信息,还要估算接口的QPS,避免大量请求导致JVM栈溢出或CPU使用率到100%。

在创建CountDownLatch时,构造器参数值一定要和处理任务的子线程数相等,避免高于子线程数量造成死锁,或者低于子线程数造成部门数据丢失。子线程的countDown()方法最好放在finally代码块中,避免执行过程中出现异常导致没有被执行。为了保险起见,主线程最好使用支持超时的await()进行等待,彻底解决可能出现的死锁情况。

评论