logo头像
Snippet 博客主题

【Java并发编程实战】-深入ThreadPoolExecutor工作机制

本文于517天之前发表,文中内容可能已经过时

为了高效的管理线程,JDK1.5版本的Doug Lea大神为我们设计了大名鼎鼎的ThreadPoolExecutor。

作为Executor框架中最核心的类-ThreadPoolExecutor,我们可以认为它是一个调度工具,为了高效执行或调度任务,我们非常有必要对该类的工作原理以及内部机制了解一番。线程池的每一个参数配置都有一定的设计合理性,各个参数配置协同聚合,让资源合理运用才是我们最关注的。

线程池的生命周期

线程有五种状态:新建,就绪,运行,阻塞,死亡。线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。

我们翻开ThreadPoolExecutor源码可以看到线程池生命周期的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 对应的高3位值是111
private static final int RUNNING = -1 << COUNT_BITS;
// 对应的高3位值是000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 对应的高3位值是001
private static final int STOP = 1 << COUNT_BITS;
// 对应的高3位值是010
private static final int TIDYING = 2 << COUNT_BITS;
// 对应的高3位值是011
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY;}
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

变量ctl定义为AtomicInteger ,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示”线程池状态(runState)”,低29位表示”线程池中的任务数量(workerCnt)”。

线程池是有状态的,不同状态下线程池的行为是不一样的。具体运转流程如下:

线程池周期图

状态 含义
RUNNING 运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务执行, shutdown 方法可进入 SHUTDOWN 状态;执行 shutdownNow 方法可进入 STOP 状态
SHUTDOWN 待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务;当阻塞队列中的任务为空,并且工作线程数为0时,进入 TIDYING 状态
STOP 停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务;当工作线程数为0时,进入 TIDYING 状态
TIDYING 整理状态,此时任务都已经执行完毕(ctl记录的”任务数量”为0),并且也没有工作线程; 执行 terminated 方法后进入 TERMINATED 状态
TERMINATED 终止状态,此时线程池完全终止了,并完成了所有资源的释放

线程池的内部构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

上述源码是摘自JDK1.8中ThreadPoolExecutor的构造源码,构造器有7个重要参数,这七个参数意义重大,我们有必要深入学习。

前文讲到有了标志工作线程的个数的变量ctl,那么不同硬件配置的机器,到底应该配置多少个线程才合适呢?线程池配置的核心线程数少了,发挥不了线程池的特性,线程池配置的核心线程数多了,又造成资源耗费。

现在我们有一个疑问,既然已经有了标识工作线程的个数的变量了,为什么还要有核心线程数、最大线程数呢?

其实你这样想就能够理解了,创建线程是有代价的,不能每次要执行一个任务时就创建一个线程,但是也不能在任务非常多的时候,只有少量的线程在执行,这样任务是来不及处理的,而是应该创建合适的足够多的线程来及时的处理任务。随着任务数量的变化,当任务数明显很小时,原本创建的多余的线程就没有必要再存活着了,因为这时使用少量的线程就能够处理的过来了,所以说真正工作的线程的数量,是随着任务的变化而变化的。

那核心线程数和最大线程数与工作线程个数的关系是什么呢?

工作线程的个数可能从0到最大线程数之间变化,当执行一段时间之后可能维持在 corePoolSize,但也不是绝对的,取决于核心线程是否允许被超时回收。

corePoolSize核心线程数

corePoolSize 用来表示线程池中的核心线程的数量,也可以称为可闲置的线程数量。当提交一个任务时,线程池会新建一个线程来执行任务,直到当前线程数等于corePoolSize。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

maximumPoolSize最大线程数

maximumPoolSize 用来表示线程池中最多能够创建的线程数量。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了。

keepAliveTime线程空闲时间

线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。

当工作线程数达到 corePoolSize 时,线程池会将新接收到的任务存放在阻塞队列中,而阻塞队列又两种情况:一种是有界的队列,一种是无界的队列。

如果是无界队列,那么当核心线程都在忙的时候,所有新提交的任务都会被存放在该无界队列中,这时最大线程数将变得没有意义,因为阻塞队列不会存在被装满的情况。

如果是有界队列,那么当阻塞队列中装满了等待执行的任务,这时再有新任务提交时,线程池就需要创建新的“临时”线程来处理,相当于增派人手来处理任务。

但是创建的“临时”线程是有存活时间的,不可能让他们一直都存活着,当阻塞队列中的任务被执行完毕,并且又没有那么多新任务被提交时,“临时”线程就需要被回收销毁,在被回收销毁之前等待的这段时间,就是非核心线程的存活时间,也就是 keepAliveTime 属性。

那么什么是“非核心线程”呢?是不是先创建的线程就是核心线程,后创建的就是非核心线程呢?

其实核心线程跟创建的先后没有关系,而是跟工作线程的个数有关,如果当前工作线程的个数大于核心线程数,那么所有的线程都可能是“非核心线程”,都有被回收的可能。

一个线程执行完了一个任务后,会去阻塞队列里面取新的任务,在取到任务之前它就是一个闲置的线程。

取任务的方法有两种,一种是通过 take() 方法一直阻塞直到取出任务,另一种是通过 poll(keepAliveTime,timeUnit) 方法在一定时间内取出任务或者超时,如果超时这个线程就会被回收,请注意核心线程一般不会被回收。

那么怎么保证核心线程不会被回收呢?还是跟工作线程的个数有关,每一个线程在取任务的时候,线程池会比较当前的工作线程个数与核心线程数:

  • 如果工作线程数小于当前的核心线程数,则使用第一种方法取任务,也就是没有超时回收,这时所有的工作线程都是“核心线程”,他们不会被回收;
  • 如果大于核心线程数,则使用第二种方法取任务,一旦超时就回收,所以并没有绝对的核心线程,只要这个线程没有在存活时间内取到任务去执行就会被回收。

所以每个线程想要保住自己“核心线程”的身份,必须充分努力,尽可能快的获取到任务去执行,这样才能逃避被回收的命运。

核心线程一般不会被回收,但是也不是绝对的,如果我们设置了允许核心线程超时被回收的话,那么就没有核心线程这种说法了,所有的线程都会通过 poll(keepAliveTime, timeUnit) 来获取任务,一旦超时获取不到任务,就会被回收,一般很少会这样来使用,除非该线程池需要处理的任务非常少,并且频率也不高,不需要将核心线程一直维持着。

unit线程空闲时间单位

keepAliveTime的单位。TimeUnit,可设置时分秒毫秒

workQueue任务队列

workQueue用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口。我们可以选择如下几种:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
  • LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
  • PriorityBlockingQueue:具有优先界别的阻塞队列。

上面我们说了核心线程数和最大线程数,并且也介绍了工作线程的个数是在0和最大线程数之间变化的。但是不可能一下子就创建了所有线程,把线程池装满,而是有一个过程,这个过程是这样的:

当线程池接收到一个任务时,如果工作线程数没有达到corePoolSize,那么就会新建一个线程,并绑定该任务,直到工作线程的数量达到 corePoolSize 前都不会重用之前的线程。

当工作线程数达到 corePoolSize 了,这时又接收到新任务时,会将任务存放在一个阻塞队列中等待核心线程去执行。为什么不直接创建更多的线程来执行新任务呢,原因是核心线程中很可能已经有线程执行完自己的任务了,或者有其他线程马上就能处理完当前的任务,并且接下来就能投入到新的任务中去,所以阻塞队列是一种缓冲的机制,给核心线程一个机会让他们充分发挥自己的能力。另外一个值得考虑的原因是,创建线程毕竟是比较昂贵的,不可能一有任务要执行就去创建一个新的线程。

所以我们需要为线程池配备一个阻塞队列,即任务列表,用来临时缓存任务,这些任务将等待工作线程来执行。

threadFactory线程工厂

既然是线程池,那自然少不了线程,线程该如何来创建呢?这个任务就交给了线程工厂 ThreadFactory 来完成。

该对象可以通过Executors.defaultThreadFactory(),如下:

1
2
3
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}

返回的DefaultThreadFactory对象,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

ThreadFactory的默认实现DefaultThreadFactory就是提供创建线程功能的线程工厂。他是通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

handler拒绝策略

虽然我们有了阻塞队列来对任务进行缓存,这从一定程度上为线程池的执行提供了缓冲期,但是如果是有界的阻塞队列,那就存在队列满的情况,也存在工作线程的数据已经达到最大线程数的时候。如果这时候再有新的任务提交时,显然线程池已经心有余而力不足了,因为既没有空余的队列空间来存放该任务,也无法创建新的线程来执行该任务了,所以这时我们就需要有一种拒绝策略,即 handler。

RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。

线程池提供了四种拒绝策略:

  • AbortPolicy:直接抛出异常,默认策略;
  • CallerRunsPolicy:用调用者所在的线程来执行任务;
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • DiscardPolicy:直接丢弃任务;

当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。

线程池的工作流程

了解了线程池中所有的重要属性之后,现在我们需要来了解下线程池的工作流程了。

线程池工作流程

上图是一张线程池工作的精简图,实际的过程比这个要复杂的多,不过这些应该能够完全覆盖到线程池的整个工作流程了。

整个过程可以拆分成以下几个部分:

提交任务

线程池提交任务流程图

当向线程池提交一个新的任务时,线程池有三种处理情况,分别是:创建一个工作线程来执行该任务、将任务加入阻塞队列、拒绝该任务。

提交任务的过程也可以拆分成以下几个部分:

  • 当工作线程数小于核心线程数时,直接创建新的核心工作线程

  • 当工作线程数不小于核心线程数时,就需要尝试将任务添加到阻塞队列中去

  • 如果能够加入成功,说明队列还没有满,那么需要做以下的二次验证来保证添加进去的任务能够成功被执行

    1. 验证当前线程池的运行状态,如果是非RUNNING状态,则需要将任务从阻塞队列中移除,然后拒绝该任务

    2. 验证当前线程池中的工作线程的个数,如果为0,则需要主动添加一个空工作线程来执行刚刚添加到阻塞队列中的任务

  • 如果加入失败,则说明队列已经满了,那么这时就需要创建新的“临时”工作线程来执行任务

    1. 如果创建成功,则直接执行该任务

    2. 如果创建失败,则说明工作线程数已经等于最大线程数了,则只能拒绝该任务了

整个过程可以用下面这张图来表示:

创建工作线程

创建工作线程需要做一系列的判断,需要确保当前线程池可以创建新的线程之后,才能创建。

首先,当线程池的状态是 SHUTDOWN 或者 STOP 时,则不能创建新的线程。

另外,当线程工厂创建线程失败时,也不能创建新的线程。

还有就是当前工作线程的数量与核心线程数、最大线程数进行比较,如果前者大于后者的话,也不允许创建。

除此之外,会尝试通过 CAS 来自增工作线程的个数,如果自增成功了,则会创建新的工作线程,即 Worker 对象。

然后加锁进行二次验证是否能够创建工作线程,最后如果创建成功,则会启动该工作线程。

启动工作线程

当工作线程创建成功后,也就是 Worker 对象已经创建好了,这时就需要启动该工作线程,让线程开始干活了,Worker 对象中关联着一个 Thread,所以要启动工作线程的话,只要通过 worker.thread.start() 来启动该线程即可。

启动完了之后,就会执行 Worker 对象的 run 方法,因为 Worker 实现了 Runnable 接口,所以本质上 Worker 也是一个线程。

通过线程 start 开启之后就会调用到 Runnable 的 run 方法,在 worker 对象的 run 方法中,调用了 runWorker(this) 方法,也就是把当前对象传递给了 runWorker 方法,让他来执行。

获取任务并执行

在 runWorker 方法被调用之后,就是执行具体的任务了,首先需要拿到一个可以执行的任务,而 Worker 对象中默认绑定了一个任务,如果该任务不为空的话,那么就是直接执行。

执行完了之后,就会去阻塞队列中获取任务来执行,而获取任务的过程,需要考虑当前工作线程的个数。

  • 如果工作线程数大于核心线程数,那么就需要通过 poll 来获取,因为这时需要对闲置的线程进行回收;
  • 如果工作线程数小于等于核心线程数,那么就可以通过 take 来获取了,因此这时所有的线程都是核心线程,不需要进行回收,前提是没有设置 allowCoreThreadTimeOut

本篇基本笔者都是对线程池的基本模型以及内部概念进行分析,说实话,写完之后读了一遍,比较抽象难理解,下篇我将以JDK源码进行导读,继续探究线程池的工作原理,敬请期待。。。

参考资料

支付宝打赏 微信打赏

请作者喝杯咖啡吧