简介

解释线程池之前要先说一下池化技术,池化技术简单点来说,就是提前保存大量的资源,以备不时之需。而线程池就是利用池化技术保存线程资源的容器,同样也是Java多线程编程的重要基础。

多线程优点:

  • 避免线程频繁的创建以及销毁带来的资源浪费
  • 提高响应速度,任务到达时提前保存的线程可以立即执行,不需要等待临时创建
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • 提供定时执行、定期执行、单线程、并发数控制等功能。

继承体系

图片

Executor:

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor类是线程池顶级接口,只定义了一个执行无返回值任务的方法。

ExecutorService:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
	public interface ExecutorService extends Executor {

// 关闭线程池,不再接受新任务,但已经提交的任务会执行完成
void shutdown();

// 立即关闭线程池,尝试停止正在运行的任务,未执行的任务将不再执行
// 被迫停止及未执行的任务将以列表的形式返回
List<Runnable> shutdownNow();

// 检查线程池是否已关闭
boolean isShutdown();

// 检查线程池是否已终止,只有在shutdown()或shutdownNow()之后调用才有可能为true
boolean isTerminated();

// 在指定时间内线程池达到终止状态了才会返回true
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 执行有返回值的任务,任务的返回值为task.call()的结果
<T> Future<T> submit(Callable<T> task);

// 执行有返回值的任务,任务的返回值为这里传入的result,相当于给指针赋值
// 当然只有当任务执行完成了调用get()时才会返回
<T> Future<T> submit(Runnable task, T result);

// 执行有返回值的任务,返回值.get()为线程返回值
Future<?> submit(Runnable task);

// 批量执行任务,只有当这些任务都完成了这个方法才会返回,可以获取线程ID来区分集合中的返回值
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 在指定时间内批量执行任务,未执行完成的任务将被取消
// 这里的timeout是所有任务的总时间,不是单个任务的时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 返回任意一个已完成任务的执行结果,未执行完成的任务将被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 在指定时间内如果有任务已完成,则返回任意一个已完成任务的执行结果,未执行完成的任务将被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService类仍然只是接口,在Executor的基础上增加了关闭线程池、池内线程执行等相关操作。

ScheduledExecutorService:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
	public interface ScheduledExecutorService extends ExecutorService {

// 在指定延时后执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

// 在指定延时后执行一次
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

// 在指定延时后开始执行,并在之后以指定时间间隔重复执行(间隔不包含任务执行的时间)
// 无论任务是否完成,只要到时间就执行下一次
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

// 在指定延时后开始执行,并在之后以指定延时重复执行(间隔包含任务执行的时间)
// 上次任务结束才开始倒计时,只可能一个线程在工作
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}

ScheduledExecutorService接口在ExecutorService的基础上增加了定时任务的相关功能,这些定时功能又分为单次执行和重复执行。

AbstractExecutorService:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
public class ThreadPoolExecutor extends AbstractExecutorService{

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final long deadline = System.nanoTime() + nanos;
final int size = futures.size();

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}

for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}

AbstractExecutorService是个抽象类,首先重写了ExecutorService类的submit()、invokeAny()、invokeAll()方法,另外还提供了一个newTaskFor方法用于构建RunnableFuture对象。

ThreadPoolExecutor:

1
2
3
public class ThreadPoolExecutor extends AbstractExecutorService {

}

ThreadPoolExecutor是一个普通类,也是我们使用线程池时需要创建的实例,内部集成了AbstractExecutorService抽象类,也就意味着它包含了以上介绍的所有接口(除了ScheduledExecutorService)的处理逻辑,也是本章节要着重要分析的类。

ScheduledThreadPoolExecutor:

1
2
3
4
5
	public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 属性方法..
}

ScheduledThreadPoolExecutor类看继承实现关系就能看明白,在继承了ThreadPoolExecutor类所有功能的情况下,通过实现ScheduledExecutorService接口又增加了线程定时任务执行的相关逻辑。

ForkJoinPool:

1
2
3
public class ForkJoinPool extends AbstractExecutorService {
// 属性方法..
}

ForkJoinPool比较适合计算密集型的任务,以后有机会用到的话再写。

构造器参数

在ThreadPoolExecutor类中有4个构造器,但最终调用的是如下这个构造器:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

corePoolSize:
核心线程大小,即在没有任务需要执行的时候线程池的大小,当所有核心线程都在执行任务时仍然有新任务提交,会直接进入阻塞队列。除非调用allowCoreThreadTimeOut()方法设置为true,这种情况下核心线程数空闲下来也会被回收掉。另外核心线程默认是懒加载模式,只有等到有任务的时候才会启动,比如常见的数据库连接池,在启动项目后首次访问数据库会打印{dataSource-1} inited日志,其实就是在懒加载核心线程,除非你调用prestartCoreThread()或prestartAllCoreThreads()方法提前启动核心线程。

maximumPoolSize:
线程池中允许的最大线程数,如果说corePoolSize是控制同一时刻线程执行数量的下限,maximumPoolSize就是与之对应的上限。当阻塞队列已满并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是getLargestPoolSize()方法,调用该方法会返回线程池在整个生命周期中曾经出现的最大线程个数。

keepAliveTime:
线程空闲时的存活时间,当线程持续keepAliveTime时间处于空闲状态时,这个空闲线程会被销毁。默认情况下,该参数只会对非核心线程生效,如果调用allowCoreThreadTimeOut()被设置为true时,无论线程数多少,线程处于空闲状态超过一定时间就会被销毁掉。

unit:
keepAliveTime的单位,TimeUnit是一个枚举类型,具体哪些就没必要讲了。

workQueue:
阻塞队列,当所有核心线程都在执行任务时仍然有新任务提交时,会加入此队列等待。构造器中阻塞队列的范型必须是Runnable类型,换句话说只有实现Runnable接口的类才可以加入阻塞队列,这个下面会单独讲。

threadFactory:
线程工厂,用于新线程的创建,创建时可以设定线程名、是否为daemon线程等等。

handler:
拒绝策略,当阻塞队列已满并且线程池中的线程数量也达到最大限制,必须采取一种策略处理该任务。线程池提供了四种决绝策略,如果仍然无法满足业务需求,还可以通过实现RejectedExecutionHandler接口自定义拒绝策略。

阻塞队列

线程池允许设置的阻塞队列对象,用来保存等待被执行的任务的阻塞队列,队列全部都是BlockingQueue接口的实现类,并且范型必须实现Runable接口,如下阻塞队列:

ArrayBlockingQueue(有界队列):
是一个基于数组实现的的阻塞队列,队列长度在创建后固定不可修改,此队列按照先进先出(FIFO)的原则对元素进行排序。ArrayBlockingQueue插入数据和获取数据,需要竞争到锁才可以执行,也就意味着这俩个操作无法并行执行,另外可以通过构造器参数设置竞争的公平性。

LinkedBlockingQueue(无界队列):
是一个基于链表实现的的阻塞队列,如果在创建时没有在构造器中指定容量,那么容量默认为Integer.MAX_VALUE。LinkedBlockingQueue队列的插入和消费元素采用分离的锁控制,也就意味着这俩种操作可以并行执行,整体的吞吐性能要高于ArrayBlockingQueue,当队列达到最大容量时,插入元素的线程会进入阻塞,直到队列的元素被消费掉腾出空间才会被唤醒继续执行。

DelayQueue(延迟队列):
此队列中在长度方面没有任何限制,因此往队列插入元素时不会产生任何阻塞,如果线程池任务想要加入此队列除了要实现Runnable接口外,还需要实现Delayed接口。获取元素时,只有当元素的延迟时间到了才可以从队列中获取到该元素,否则会进入阻塞。

PriorityBlockingQueue(优先级队列):
此队列在长度方面仍然没有限制,插入元素操作也不会产生任何阻塞,如果线程池任务想要加入此队列除了要实现Runnable接口外,还需要实现Compator接口。在使用没有容量限制的队列时一定要注意,插入元素的速度绝对不能大于消费元素的速度,否则随着时间的积累,会耗尽系统的内存资源造成内存泄漏。

SynchronousQueue(无缓冲等待队列):
队列中仅保存一个元素,只有对列为空才可以添加元素,之后只有等待元素被消费才可以继续添加。拥有公平(FIFO)和非公平(LIFO)策略,可以在创建时指定。

拒绝策略

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务。拒绝策略并不是设置就一定生效,比如阻塞队列选择无界的情况下,基本上队列堆积的任务没有到达Inteher.MAX_VALUE时,内存就爆了。java提供了四种线程池拒绝策略,当然你也可以通过继承这四个策略类,或者实现RejectedExecutionHandler接口自定义拒绝策略:

AbortPolicy(异常中止策略):
直接抛出RejectedExecutionException异常,也是线程池的默认拒绝策略。这种策略使用的时候要处理好抛出的异常,避免调用线程池的主线程因为异常打断后续的执行流程。

DiscardPolicy(丢弃策略):
直接丢弃任务,因为此类在实现RejectedExecutionHandler接口并重写的rejectedExecution方法中啥都没做。如果你提交的任务无关紧要,可以选择使用此策略,我个人感觉这个策略几乎用不上,但凡有点良心的开发都会打印一行日志意思意思…

DiscardOldestPolicy(弃老策略):
放弃阻塞队列中最靠前的任务,并尝试让线程池执行当前线程。这种策略仍然会悄悄的丢掉任务,只不过保证新产生的任务优先执行,应该是满足特定场景使用的吧。

CallerRunsPolicy(调用者运行策略):
当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。原理是直接运行Runnable的run()方法,直接调用run()的方式都懂得,会阻塞调用者直到执行完毕。
这种方式看似比较稳妥,能保证所有的任务都会被执行,但是拒绝策略的rejectedExecution()方法是包含在线程池的execute()方法中调用,execute()在执行过程中会占用一条线程,如果多个线程进入此阻塞策略并且线程执行时间过长,会严重影响线程池处理任务的吞吐量。因此这种策略一般在不允许失败、对性能要求不高、并发亮较小的场景下使用。

自定义:
如果以上四种策略无法满足你的需求,那就需要考虑自定义策略了。比如dubbo的工作线程池自定义的拒绝策略是继承AbortPolicy类,打印完日志后调用父类抛异常、比如ActiveMQ中的拒绝策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常。

参数设置

线程池的使用难度不大,但用好线程池就需要对常用参数的含义有一定的理解,并且要考虑到应用程序所在服务器的CPU配置、任务的执行特点、任务执行过程中的内存使用率等,如果任务中涉及下游服务的调用,还要考虑到下游服务的抗并发能力等。可以将线程池要执行的任务进行分类:

CPU密集型:
例如内存中的计算、比较、转化等,尽量使用较小的线程池,一般为CPU核心数+1。即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费。因为CPU密集型任务使得CPU使用率很高,尽量减少线程之间竞争引起的上下文切换带来的资源浪费。

IO密集型:
例如网络IO(调用其他服务或接口)、磁盘IO(读写文件)等,可以使用稍大的线程池,一般为2*CPU核心数+1(如果调用下游服务还要考虑抗并发因素)。因为IO操作期间不占用CPU,不要让CPU闲下来,应加大线程数量,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。

混合型:
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。

注:通过公式推算出的线程池参数仅仅只是理想状态下的最优方案,实际最优参数需要根据服务器运行情况比,如线程执行过程中占用CPU时间、最大线程数峰值、拒绝策略出现频率等不断调整参数。

springboot线程池

配置自定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {

/**
* 自定义线程池,若不重写会使用默认的线程池
*/
@Bean("asyncExecutor")
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor asyncExecutor = new ThreadPoolTaskExecutor();
asyncExecutor.setCorePoolSize(16);
asyncExecutor.setMaxPoolSize(32);
asyncExecutor.setKeepAliveSeconds(180);
asyncExecutor.setQueueCapacity(200);
asyncExecutor.setThreadNamePrefix("buss-thread"); // 线程命名前缀
asyncExecutor.initialize();
return asyncExecutor;
}

/**
* 自定义拒绝策略
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {

return new AbortPolicy();
}
}
注: @EnableAsync注解一定要加,否则线程池异步调用不生效。

配置异步任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class UserServiceImpl implements UserService {

@Override
public void updateById(UserTo to) {
// ...
}

@Async
@Override
public void asyncUpdateById(UserTo to) {
updateById(to);
}

@Async
@Override
public Future<Boolean> asyncUpdateById(UserTo to) {

try{
updateById(to);
} catch(){
return new AsyncResult<>(Boolean.FALSE);
}
return new AsyncResult<>(Boolean.TRUE);
}
}

如果想要使用异步方式调用某个方法,只需要加上@Async注解即可,如果需要返回值必须使用Future,其他返回值类型在调用后会立刻返回null,如果不需要返回值直接将方法返回值设置为void。最好封装一个异步方法,减少对原始方法的破坏,避免其他非异步使用的线程调用后返回null导致程序错误。

注:如果应用配置了多个线程池,则需要在@Async注解的value属性中指定线程池Bean名称,没有指定的情况下使用默认线程池(@primary注解的bean)

总结

评论

Powered By Valine
v1.4.14