logo头像
Snippet 博客主题

【Java并发编程实战】-Java线程池基础

本文于518天之前发表,文中内容可能已经过时

线程池是限制系统中执行线程的数量,并能够根据一定的策略回收线程并重复使用。JDK1.5之后加入了java.util.concurrent包,我们日常称之为J.U.C并发包,这个包对我们日常解决并发提供了非常大的帮助。

根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

专业解释:线程池管理一个工作者线程的同构池,线程池是与工作队列紧密绑定。所谓工作队列,其作用是持有所有等待执行的任务。工作者线程的生活从此轻松起来:它从工作队列中获取下一个任务,执行它,然后回来继续等待另一个线程。

显示创建线程弊端

在日常开发过程中,如果每次显示的new Thread().start(),会存在很大的性能缺陷:

  1. 线程生命周期的开销比较大。线程的创建与关闭都很耗时,对于计算机来讲,是个不容易的事情。
  2. 活动线程会很消耗系统资源,尤其是内存。如果可运行的线程数多于可用的处理器内核数量。如果大量线程空闲,会占用更多的内存,给垃圾回收带来压力。如果大量线程繁忙,导致大量线程同时竞争CPU资源,性能开销多,造成程序假死,计算机死机等危害。
  3. 稳定性和可维护性差。分散的线程多了无法集中式管理,从而导致意想不到的BUG,比如一直创建线程造成的OOM问题。

Executor框架

作为线程池的一部分,JDK1.5为我们提供了一个使用有界队列防止应用程序过载而耗尽内存的Executor框架,该线程池实现细节在java.util.comcurrent包下(简称并发包/J.U.C包)。在Java类库中,任务执行的首要抽象不是Thread,而是Executor,如下图所示的是简单Executor框架抽象依赖:

Executor只是简单的接口,这个框架可以用于异步任务执行,而且支持多种类型的任务执行策略。它为任务提交和任务执行之间的解耦提供了标准的方法,为使用Runnable描述任务提供了通用方式。Executor的实现还提供了对生命周期的支持以及钩子函数,可以添加诸如统计收集、应用程序管理机制和监视器等扩展。

Executor基于生产者-消费者模式。提交任务的执行者是生产者(产生待完成的工作单元),执行任务的线程是消费者(消耗掉这些工作单元)。Executor,任务的执行者,线程池框架中几乎所有类都直接或者间接实现Executor接口,它是线程池框架的基础。Executor提供了一种将“任务提交”与“任务执行”分离开来的机制,它仅提供了一个Execute()方法用来执行已经提交的Runnable任务。

Executor框架体系依赖图

Executor接口概述

1
2
3
4
5
6
7
8
package java.util.concurrent;
public interface Executor {
/**
* 方法描述:抽象出来的任务执行方法,方便后续不同类型的线程池的执行
* 参数描述:Runnable线程任务对象
*/
void execute(Runnable command);
}

ExcutorService接口概述

继承Executor,它是“执行者服务”接口,它是为”执行者接口Executor”服务而存在的。准确的地说,ExecutorService提供了”将任务提交给执行者的接口(submit方法)”,”让执行者执行任务(invokeAll, invokeAny方法)”的接口等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package java.util.concurrent;
import java.util.List;
import java.util.Collection;

public interface ExecutorService extends Executor {

/**
* 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
*/
void shutdown();

/**
* 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
*/
List<Runnable> shutdownNow();

/**
* 如果此执行程序已关闭,则返回 true。
*/
boolean isShutdown();

/**
* 如果关闭后所有任务都已完成,则返回 true
*/
boolean isTerminated();

/**
* 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future
*/
<T> Future<T> submit(Callable<T> task);

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交可运行的任务以供执行,并返回未来的结果,并返回给给定结果
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future
*/
Future<?> submit(Runnable task);

/**
* 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService接口概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package java.util.concurrent;
import java.util.List;
import java.util.Collection;

public interface ExecutorService extends Executor {
/**
* 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
*/
void shutdown();

/**
* 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
*/
List<Runnable> shutdownNow();

/**
* 如果此执行程序已关闭,则返回 true。
*/
boolean isShutdown();

/**
* 如果关闭后所有任务都已完成,则返回 true.
*/
boolean isTerminated();

/**
* 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future
*/
<T> Future<T> submit(Callable<T> task);

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future
*/
Future<?> submit(Runnable task);

/**
* 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService类概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package java.util.concurrent;
import java.util.*;

public abstract class AbstractExecutorService implements ExecutorService {

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}


public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final long deadline = System.nanoTime() + nanos;
final int size = futures.size();

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}

for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}

抽象类,实现ExecutorService接口,为其提供默认实现。AbstractExecutorService除了实现ExecutorService接口外,还提供了两个newTaskFor()重载方法,一个支持Runnable任务,一个支持带有执行结果的Callable任务,执行后返回一个RunnableFuture,在运行的时候,它将调用底层可调用任务,作为 Future 任务,它将生成可调用的结果作为其结果,并为底层任务提供取消操作。

同时该抽象类也提供了两个submit()方法,一个只提交Runnable任务,一个提交Runnable的任务并将返回的未来的默认值封装到第二个返回参数中。

ScheduledExecutorService接口概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package java.util.concurrent;


public interface ScheduledExecutorService extends ExecutorService {
/**
* 创建并执行在给定延迟后启用的 ScheduledFuture。
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

/**
* 创建并执行在给定延迟后启用的一次性操作。
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;
* 也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay+2* period 后执行,依此类推。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);

/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);

}

ThreadPoolExecutor类概述

我们Java线程池主要是围绕ThreadPoolExecutor类展开的,后续会对ThreadPoolExecutor详解

Executors类概述

和ThreadPoolExecutor一样后续会详解,该类是线程池的静态工厂类,提供了Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 、Callable 等类的静态工厂方法,通过这些工厂方法我们可以得到相对应的对象。

  1. 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
  2. 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
  3. 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
  4. 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
  5. 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。

Future接口

Future接口和实现Future接口的FutureTask代表了线程池的异步计算结果。

AbstractExecutorService提供了newTaskFor()方法返回一个RunnableFuture,除此之外当我们把一个Runnable或者Callable提交给(submit())ThreadPoolExecutor或者ScheduledThreadPoolExecutor时,他们则会向我们返回一个FutureTask对象。如下:

1
2
3
4
5
6
7
8
9
10
11
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)

Future接口概述

作为异步计算的顶层接口,Future对具体的Runnable或者Callable任务提供了三种操作:执行任务的取消、查询任务是否完成、获取任务的执行结果。其接口定义如下:

Future依赖关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package java.util.concurrent;

public interface Future<V> {

/**
* 试图取消对此任务的执行
* 如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。
* 当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。
* 如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 如果在任务正常完成前将其取消,则返回 true
*/
boolean isCancelled();

/**
* 如果任务已完成,则返回 true
*/
boolean isDone();

/**
* 如有必要,等待计算完成,然后获取其结果
*/
V get() throws InterruptedException, ExecutionException;

/**
* 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

RunnableFuture接口概述

继承Future、Runnable两个接口,为两者的合体,即所谓的Runnable的Future。提供了一个run()方法可以完成Future并允许访问其结果,其源码如下:

1
2
3
4
5
6
package java.util.concurrent;

public interface RunnableFuture<V> extends Runnable, Future<V> {
//在未被取消的情况下,将此 Future 设置为计算的结果
void run();
}

FutureTask接口概述

实现RunnableFuture接口,既可以作为Runnable被执行,也可以作为Future得到Callable的返回值。

参考资料

支付宝打赏 微信打赏

请作者喝杯咖啡吧