logo头像
Snippet 博客主题

【【Java并发编程实战】线程池应用之Executors线程池静态工厂

本文于517天之前发表,文中内容可能已经过时

今天我们来认识一下,线程池的静态工厂Executors类,平时写测试代码的时候可用一用,生产代码尽量少用。为什么说生产上尽量少用呢?这是有原因的,请耐心听彤哥讲解。

阿里的代码规约不建议使用这样的工厂类,而建议使用手动创建并管理ThreadPoolExecutor,在阿里巴巴Java开发手册中,编程规约下的并发处理规约第4点如下:

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:

1)FixedThreadPoolSingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

2)CachedThreadPoolScheduledThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

正例 1:

1
2
3
4
> //org.apache.commons.lang3.concurrent.BasicThreadFactory
> ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
> new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
>

正例 2:

1
2
3
4
5
6
7
8
9
> ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
> .setNameFormat("demo-pool-%d").build();
> //Common Thread Pool
> ExecutorService pool = new ThreadPoolExecutor(5, 200,
> 0L, TimeUnit.MILLISECONDS,
> new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
> pool.execute(()-> System.out.println(Thread.currentThread().getName()));
> pool.shutdown();//gracefully shutdown
>
1
2
3
4
5
6
7
8
9
10
11
12
13
> 正例 3:
> <bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
> <property name="corePoolSize" value="10" />
> <property name="maxPoolSize" value="100" />
> <property name="queueCapacity" value="2000" />
> <property name="threadFactory" value= threadFactory />
> <property name="rejectedExecutionHandler">
> <ref local="rejectedExecutionHandler" />
> </property>
> </bean>
> //in code
> userThreadPool.execute(thread);
>

看到如下的规约,我们是不是没必要往下学了?答案是当然不行的,我们虽然不怎么用,但是研究其源码,可以让我们更深入线程池原理(增强内力),使得我们以后更容易设计线程池并能够高效管理线程,以至于不像代码规约描述的那样:用而不当导致OOM的问题。

源码分析

好了,话不多说,直接上菜(源码),够我们今天喝几壶了,哈哈。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
package java.util.concurrent;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.security.PrivilegedActionException;
import java.security.AccessControlException;
import sun.security.util.SecurityConstants;

/**
* Executors线程池静态工厂类
*/
public class Executors {

/**
* 创建重用固定数量线程的线程池,使用的是共享无参无边界队列。
* 在任何时候,最多线程将是活动的处理任务。
* 如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到线程可用。
* 在关闭之前,如果任何线程在执行期间因失败而终止,如果需要,将更换一个新的,执行后续任务。
* 池中的线程将存在直到它显式executerservice关闭。
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

/**
* 该方法是JDk8提供的。特殊的线程池,采用JDK1.7的ForkJoinPool实现
* 创建一个线程池,以维护足够的线程来支持给定的并行级别,并且可以使用多个队列来减少争用。
* 平行度级别对应于主动参与或可用于的线程的最大数目从事,任务处理。
* 实际线程数可能动态增长和收缩。偷工减料是不行的,保证提交任务的顺序执行。
*
* @param 并行级别
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

/**
* 重载方法: 只是默认参数用的是当前机器可利用CPU核数
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

/**
* 重载方法: 创建重用固定数量线程的线程池
* 只不过参数多了一个,可以自定义线程池工厂
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

/**
* 创建使用单个工作线程操作的执行器,即单线程工作模式,不可配置线程的大小。
* 也是在一个无边界的队列中运行的。相当于newFixedThreadPool(1),
* 在任何时刻,当前线程池只有1个活动线程,不可能出现其他的工作线程。
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

/**
* 重载方法: 创建使用单个工作线程操作的执行器,即单线程工作模式,不可配置线程的大小。
* 只不过可以自定义线程池工厂
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

/**
* 创建一个可重用的线程池: 根据需要创建新线程,但将重用以前构造的线程可用。
* 执行许多短期异步任务的程序,使用这类线程池可以提升程序性能。
* 如果没有线程可用,则线程被创建并添加到池中。
* 具有60秒未使用的线程,线程池则回收该线程。
* 如果线程可用,则重用之前的活动线程。
* 如果使用不当可能导致OOM。
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

/**
* 重载方法: 创建一个可重用的线程池
* 只不过可以自定义线程池工厂
* 如果使用不当可能导致OOM。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

/**
* 创建一个单线程的延迟任务线程池,可代替Timer(使用繁琐,线程不安全类),
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

/**
* 重载方法: 创建一个单线程的延迟任务线程池,可代替Timer(使用繁琐,线程不安全类),
* 只不过可以自定义线程池工厂
*
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

/**
* 重载方法: 创建一个延迟任务线程池
* 只不过可以自定义线程池核心大小
*
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**
* 重载方法: 创建一个延迟任务线程池
* 只不过可以自定义线程池核心大小和最大线程数量
*
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

/**
* Returns an object that delegates all defined {@link
* ExecutorService} methods to the given executor, but not any
* other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return an {@code ExecutorService} instance
* @throws NullPointerException if executor null
*/
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}

/**
* Returns an object that delegates all defined {@link
* ScheduledExecutorService} methods to the given executor, but
* not any other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return a {@code ScheduledExecutorService} instance
* @throws NullPointerException if executor null
*/
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}

/**
* Returns a default thread factory used to create new threads.
* This factory creates all new threads used by an Executor in the
* same {@link ThreadGroup}. If there is a {@link
* java.lang.SecurityManager}, it uses the group of {@link
* System#getSecurityManager}, else the group of the thread
* invoking this {@code defaultThreadFactory} method. Each new
* thread is created as a non-daemon thread with priority set to
* the smaller of {@code Thread.NORM_PRIORITY} and the maximum
* priority permitted in the thread group. New threads have names
* accessible via {@link Thread#getName} of
* <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
* number of this factory, and <em>M</em> is the sequence number
* of the thread created by this factory.
* @return a thread factory
*/
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}

/**
* Returns a thread factory used to create new threads that
* have the same permissions as the current thread.
* This factory creates threads with the same settings as {@link
* Executors#defaultThreadFactory}, additionally setting the
* AccessControlContext and contextClassLoader of new threads to
* be the same as the thread invoking this
* {@code privilegedThreadFactory} method. A new
* {@code privilegedThreadFactory} can be created within an
* {@link AccessController#doPrivileged AccessController.doPrivileged}
* action setting the current thread's access control context to
* create threads with the selected permission settings holding
* within that action.
*
* <p>Note that while tasks running within such threads will have
* the same access control and class loader settings as the
* current thread, they need not have the same {@link
* java.lang.ThreadLocal} or {@link
* java.lang.InheritableThreadLocal} values. If necessary,
* particular values of thread locals can be set or reset before
* any task runs in {@link ThreadPoolExecutor} subclasses using
* {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}.
* Also, if it is necessary to initialize worker threads to have
* the same InheritableThreadLocal settings as some other
* designated thread, you can create a custom ThreadFactory in
* which that thread waits for and services requests to create
* others that will inherit its values.
*
* @return a thread factory
* @throws AccessControlException if the current access control
* context does not have permission to both get and set context
* class loader
*/
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns {@code null}.
* @param task the task to run
* @return a callable object
* @throws NullPointerException if task null
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given privileged action and returns its result.
* @param action the privileged action to run
* @return a callable object
* @throws NullPointerException if action null
*/
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given privileged exception action and returns
* its result.
* @param action the privileged exception action to run
* @return a callable object
* @throws NullPointerException if action null
*/
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}

/**
* Returns a {@link Callable} object that will, when called,
* execute the given {@code callable} under the current access
* control context. This method should normally be invoked within
* an {@link AccessController#doPrivileged AccessController.doPrivileged}
* action to create callables that will, if possible, execute
* under the selected permission settings holding within that
* action; or if not possible, throw an associated {@link
* AccessControlException}.
* @param callable the underlying task
* @param <T> the type of the callable's result
* @return a callable object
* @throws NullPointerException if callable null
*/
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}


public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}

// Non-public classes supporting the public methods

/**
* 运行给定任务并返回给定结果的可调用文件
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

/**
* 在已建立的访问控制设置下运行的可调用文件
*/
static final class PrivilegedCallable<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;

PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}

public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
return task.call();
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}

/**
* 在已建立的访问控制设置和当前类加载器下运行的可调用文件。
*/
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
private final ClassLoader ccl;

PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

// Whether setContextClassLoader turns out to be necessary
// or not, we fail fast if permission is not available.
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.task = task;
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}

public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
Thread t = Thread.currentThread();
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
return task.call();
} else {
t.setContextClassLoader(ccl);
try {
return task.call();
} finally {
t.setContextClassLoader(cl);
}
}
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}

/**
* 默认线程工厂
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

/**
* 线程工厂捕获访问控制上下文和类加载器
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;

PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}

public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}

/**
* 仅公开ExecutorService方法的包装类
* 执行器服务的实现类
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}

/**
* 仅公开ScheduledExecutorService的包装类
* ScheduledExecutorService实现的方法
*/
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

/** 不允许实例化 */
private Executors() {}
}

常用方法

FixedThreadPool

FixedThreadPool,可重用固定线程数的线程池。

corePoolSize 和 maximumPoolSize都设置为创建FixedThreadPool时指定的参数nThreads,意味着当线程池满时且阻塞队列也已经满时,如果继续提交任务,则会直接走拒绝策略,该线程池不会再新建线程来执行任务,而是直接走拒绝策略。FixedThreadPool使用的是默认的拒绝策略,即AbortPolicy,则直接抛出异常。

keepAliveTime设置为0L,表示空闲的线程会立刻终止。

workQueue则是使用LinkedBlockingQueue,但是没有设置范围,那么则是最大值(Integer.MAX_VALUE),这基本就相当于一个无界队列了。使用该“无界队列”则会带来哪些影响呢?当线程池中的线程数量等于corePoolSize 时,如果继续提交任务,该任务会被添加到阻塞队列workQueue中,当阻塞队列也满了之后,则线程池会新建线程执行任务直到maximumPoolSize。由于FixedThreadPool使用的是“无界队列”LinkedBlockingQueue,那么maximumPoolSize参数无效,同时指定的拒绝策略AbortPolicy也将无效。而且该线程池也不会拒绝提交的任务,如果客户端提交任务的速度快于任务的执行,那么keepAliveTime也是一个无效参数。

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor,即单线程工作模式。

作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池 。

CachedThreadPool的corePool为0,maximumPoolSize为Integer.MAX_VALUE,这就意味着所有的任务一提交就会加入到阻塞队列中。keepAliveTime这是为60L,unit设置为TimeUnit.SECONDS,意味着空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。阻塞队列采用的SynchronousQueue,我们了解到SynchronousQueue是一个没有元素的阻塞队列(概念上的无界,其实是有界的),加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,这样就会存在一个问题,如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题

ScheduledThreadPool

ScheduledThreadPool,创建一个可延迟调度的线程池。

ScheduledThreadPool的线程池处理类为ScheduledThreadPoolExecutor,通过构造我们可以得出corePool是可以自定义的,maximumPoolSize为Integer.MAX_VALUE,这就意味着所有的任务一提交就会加入到阻塞队列中。keepAliveTime为0,unit设置为TimeUnit.NANOSECONDS,意味着执行完延迟任务后会立马回收线程。阻塞队列采用DelayedWorkQueue(专为延迟队列而生),DelayedWorkQueue基于堆的数据结构,默认初始化容量为16。

我们知道Timer与TimerTask虽然可以实现线程的周期和延迟调度,但是Timer与TimerTask存在一些缺陷,所以对于这种定期、周期执行任务的调度策略,我们一般都是推荐ScheduledThreadPoolExecutor来实现。

具体分析请阅读大牛博文:

WorkStealingPool

WorkStealingPool是个特殊的线程池,但不长用,面试有时候可能会问到,内部实现原理其实是基于JDK1.7的ForkJoinPool框架,具体构造实现细节如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Creates a {@code ForkJoinPool} with the given parameters.
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
* For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}

ForkJoinPool框架是个并行框架,在多核CPU中有着高效的性能体验。Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。有关于ForkJoinPool知识,我们后续讲解。

参考资料

支付宝打赏 微信打赏

请作者喝杯咖啡吧