简介

线程与线程之间不是相互独立的个体,有些时候需要相互通信来共同完成某个业务场景,多线程之间通信总体来说分为共享内存和消息通信机制

wait/notify

概念

wait/notify采用消息通信机制来进行线程间的通信,某个线程必须达到特定条件才能继续执行下去,没有达到就将自己挂起等待,另一个线程的执行过程中会使条件达成并通知挂起等待的线程继续执行下去。

wait/notify都属于Object的方法,利用java自带的对象加锁机制争夺对应monitor,当线程不满足执行条件时调用Object的wait方法将自己挂起在monitor对象的_WaitSet上,其他线程在执行过程中将条件满足,紧接着使用Object的notify或notifyAll方法唤醒前述的等待线程,重新加入锁的竞争。

使用场景

例如线程独有的join()方法就是通过wait/notify实现线程的合并(非异步调用),在join线程执行过程中调用者线程只能等待,为了避免CPU的浪费,使用wait()方法将自己挂起在join线程的monitor对象的_WaitSet中,当join线程执行完毕后使用notify()唤醒调用者线程,继续往下执行。

简单的使用场景比如RocketMQ拉取消息时的长轮询机制,在拉取不到消息的时候将其挂起,直到Producer向本结点投递消息时,唤醒挂起的请求线程,拉取消息并返回。

复杂点的使用场景例如生产者/消费者模式,消费者线程使用while循环监听消息,如果消息队列为空则使用wait()将自己挂起,同样避免忙等造成CPU的浪费,生产者线程每次生产完数据都必须调用notify()方法,唤醒因消息队列为空而将自己挂起的消费者线程。下面是一段基于wait/notify机制的生产/消费模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Test {

private static Object obj = new Object();

private static final Queue<String> messageQueue = new LinkedBlockingDeque<>();

public static void main(String[] args) throws Exception {

Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {

for (int i = 1; i <= 9; i++) {

// 生产消息
synchronized (obj) {
messageQueue.add("第" + i + "条消息");
obj.notify();
}

// 每生产三条暂停1秒
if (i > 1 && i % 3 == 0) {
try {
System.out.println("暂停");
Thread.currentThread().sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// ...

}
}
// ...
});

Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {

synchronized (obj) {
try {
while (true) {

if (messageQueue.isEmpty()) {
obj.wait();
}

String message = messageQueue.poll();
System.out.println("消费者:" + message);
}
} catch (InterruptedException e) {
// ...
}
}
}

// ...
});

producerThread.start();
consumerThread.start();
}
}

使用细节

为什么wait()、notify()、notifyAll()必须在同步代码块中?
这三个方法都是对对象的monitor中的_WaitSet进行操作,而进入同步代码块意味着已经持有对象锁,也就持有了monitor,才有资格对_WaitSet进行操作,因此必须在同步代码块中。

为什么wait()方法要放在while()循环而不是if中?
被唤醒后线程从wait()代码之后继续执行,但是并不能保证每次被唤醒都是符合继续执行条件的,用while()被唤醒还会继续判断,不符合条件永远在while()中,而if不会。在N个线程通信的情况下,不能保证那一时刻条件被某个线程改变。

为什么wait()、notify()要定义在Object中而不是线程中?
wait()与notify()的基本思想是把某个对象作为联络点,利用锁机制拿到monitor进行联络通信,而java提供的锁是对象级的而不是线程级的,锁属于对象而不是线程专有,因此wait()、notify()、notifyAll()这种锁级别操作属于Object而不是线程专有方法。

lock/condition

概念

既然java支持使用锁进行线程通信,synchronized可以,Lock必然也可以。lock/condition与wait/notify功能类似,通过Lcok对象创建Condition对象,利用Condition对象的await()与signal()方法来阻塞唤醒。

使用场景

简单总结

评论