我所知道的线程池
2014-10-20
我所知道的线程池
线程池其实或多或少都用过,不过这是我第一次阅读它的源码,包括源码附带的非常详尽的注释。发现我之前对于线程池的理解还是很浅薄的。
其实从ThreadPoolExecutor.java顶部200多行的注释就能一定程度上了解线程池的用法了。
首先看一下线程池的初始化方式:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
参数说明 - corePoolSize 核心线程数:提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。 - maximumPoolSize 最大线程数:线程池允许存活的线程数的最大值。核心线程数不能超过最大线程数。 - keepAliveTime 非核心线程数的存活时间:如果核心线程数小于最大线程数,那么核心线程之外的那些线程当存活超过keepAliveTime,就会被终止。 - unit 存活时间的单位 - workQueue 线程池使用的队列 - threadFactory 线程工厂 - handler 当线程池拒绝提交新的任务时使用的策略。
默认构造方法的参数着实不少,一开始我根本无法理解为什么需要这么多初始化参数,不过在源码的注释中,针对不同的场景如何设置线程池参数的示例。
首先需要了解的是corePoolSize,maximumPoolSize,workQueue,handler四者之间的关系。
依照这个关系图,在源码注释中给出了3种推荐策略:
推荐策略
- 直接的任务传递:最大线程数设为Integer.MAX_VALUE,workQueue使用SynchronousQueue,因为SynchronousQueue自身特性不保留任何元素,所以当核心线程都在使用时,任何提交的任务都会创建新的非核心线程。这种方式任务处理的吞吐量最大,不过会消耗更多地资源。
- 无界队列:可以使用LinkedBlockingQueue这种无界队列,这时最大线程数这个参数不再有意义。当任务提交频率较平缓时,且所有提交的任务之间都彼此独立时使用这种策略。不过需要担心一下内存溢出的问题。
- 有界队列:例如使用ArrayBlockingQueue,会根据队列的size和最大线程数来调整线程池对资源的使用和吞吐量的高低。
接下来从源码层面来研究一下线程池的实现。
ThreadPoolExecutor自身的层级结构如图所示。最上层的执行器Executor接口本身只具备void execute(Runnable command)的能力,而ExecutorService作为一个service又必须具备提交任务、正常关闭等职能,在AbstractExecutorService实现了一些公共方法。ThreadPoolExecutor作为线程池的基础实现,其下还有若干具备特殊功能的线程池继承自ThreadPoolExecutor。
不过这篇笔记只研究ThreadPoolExecutor的实现,其他不同类别的特殊线程池的学习放在之后进行。
ThreadPoolExecutor中也涉及到一些和AbstractQueuedSynchronizer相关的操作,AbstractQueuedSynchronizer及其各种实现类可以我在并发的灵魂中有详尽的解析,在此只简单描述它的功能。
作为一个Executor的实现类,自然要从execute方法说起:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//如果工作线程数小于核心线程数
if (addWorker(command, true))//创建worker,第二个参数为true表示创建核心线程
return;//创建成功则返回
c = ctl.get();//创建失败获取保存的ctl
}
if (isRunning(c) && workQueue.offer(command)) {//判断ctl中的状态,如果线程池处于RUNNING状态,则把任务放入队列中
int recheck = ctl.get();//重新确认状态
if (! isRunning(recheck) && remove(command))//如果线程池已关闭,则移除任务
reject(command);//移除成功调用RejectedExecutionHandler
else if (workerCountOf(recheck) == 0)//如果remove失败
addWorker(null, false);//自旋检查是否Q中所有任务都已经处理完成
}//如果线程池已经关闭或队列已满
else if (!addWorker(command, false))//尝试创建worker,并使用非核心线程
reject(command);//创建失败调用RejectedExecutionHandler
}
这方法就涉及到ctl属性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
用一个支持并发的原子Int类型来保存两个属性:工作线程数和当前线程池状态。由于两个属性共享一个原子Int类型,所以线程池共支持(229)-1,大于5亿个线程。线程池的状态共分为一下五种:
- RUNNING
- SHUTDOWN 这个状态不再接收新的task,不过当前正在处理队列中的task
- STOP 这个状态不再接受新的task,并不在处理队列中的task
- TIDYING 这个状态所有的task都已经执行完毕,工作线程数降为0,不过正在执行terminated()方法
- TERMINATED 这个状态表示terminated()方法已经执行完毕
以上五种状态从上至下一次发生状态改变,唯一的特例在于如果当前没有task可以处理,那么RUNNING状态会直接转变为STOP状态。
这个int类型共32位,保留最左3位用来保存5个状态值,右边29位保存工作线程数,所以以下几个方法(rs表示runState,wc表示workerCount)也就非常好理解了。
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
回到execute方法的源码中,从源码可以看出之前流程图表示的逻辑。详细流程这次在源码中以注释方式标明。
再来详细看一下addWorker方法,终于有机会在jdk源代码中见识一下java中的“goto”了!:
//方法接收两个参数,第一个参数是首个任务,可以为空,第二个参数表示是否创建在核心线程中
private boolean addWorker(Runnable firstTask, boolean core) {
//以下是两层自旋,为了能从内层自旋直接跳到外层自旋,所以在这里设置retry标示
retry:
for (;;) {
int c = ctl.get();//首先获得ctl的引用
int rs = runStateOf(c);//计算出当前线程池的状态
// Check if queue empty only if necessary.
//以下需要判断如果线程池处于非开启状态,并且队列仍然未清空时会进行自旋
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;//如果线程池关闭后队列清空,则成功返回
//这里是内层自旋
for (;;) {
int wc = workerCountOf(c);//首先得到当前worker的数量,也就是池中线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))//如果超标
return false;//直接返回失败
if (compareAndIncrementWorkerCount(c))//没超标则cas方式增加work数
break retry;//成功后直接退出外层自旋
c = ctl.get(); // Re-read ctl//如果失败的话需要reload ctl
if (runStateOf(c) != rs)//如果当前状态已经改变,则重新自旋
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//如果状态没改变的话,也许有其他任务同时提交,内层自旋
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;//在增减Worker的时候使用锁而不使用并发容器官方的解释时,当线程池shutdown的时候为了避免线程中断的高峰,加锁后可以变为串行执行
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);//double check
if (rs < SHUTDOWN || //如果线程池正在运行
(rs == SHUTDOWN && firstTask == null)) { //或是shutdown后调用firstTask为null的时候
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)//largestPoolSize用来表示线程池中曾经到达过的最大线程数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//线程启动在这里
workerStarted = true;
}
}
} finally {
if (! workerStarted)//只有在线程池处于非运行状态下才会添加失败
addWorkerFailed(w);//这时除了会从works中移除work之外,还会触发tryTerminate操作,tryTerminate会在任何发现当前线程池已经关闭的时候触发,不只在执行shutdown操作的时候触发。
}
return workerStarted;
}
addWorker方法就是这样。如果用过线程池就会发现,提交任务还有一种带返回值的方式,也就是submit,它是怎么做的呢?答案在AbstractExecutorService这个基类中。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
提供了三种submit方式,其实都是构造成FutureTask,因为FutureTask实现了Runnable接口,可以直接调用上文提到的execute方法执行,最终把FutureTask对象返回给用户即可。
线程池还提供了invokeAll和invokeAny方法,用来批量提交任务,两个方法都是阻塞执行的,区别在于,invokeAll方法只有当所有任务都执行完之后才返回结果集,而invokeAny方法只要有一个任务执行完成了,就把结果返回,并取消其他未执行完成的任务。invokeAll和invokeAny都提供阻塞和带超时时间的方法。如下所示:
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
下面来分别看看这两种方法是如何实现的。invokeAll方法:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
这方法几乎不用解释,只是按提交任务的顺序,循环遍历任务列表等待任务结果,这过程中不响应任何异常,除非任务被取消,当有任务被取消时所有任务都依次被取消,抛出InterruptedException异常。
带超时时间的invokeAll方法其实也很简单,只不过在每次提交任务判断是否已经超时,获得futureTask结果时使用 f.get(nanos, TimeUnit.NANOSECONDS),当出现超时异常时取消所有任务,并返回结果集。
invokeAny是否包含超时时间的两个方法实际上都调用的时下面这个doInvokeAny方法,这个方法我以注释的方式加以分析:
//第二个参数用来标识是否有超时时间
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();//这里会记录待提交任务数
if (ntasks == 0)
throw new IllegalArgumentException();
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);//创建一个用来记录任务结果的集合
//这里构造了一个线程池的包装类,实现很简单,当任务执行完成后,会放入一个FIFO的队列,所以这个队里是以任务的完成先后来排序的,这一点非常重要
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
//这里记录任务运行失败的最后一次异常信息,如果所有任务都失败了,则会抛出这个异常。
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();//获得所有任务的迭代器
futures.add(ecs.submit(it.next()));//执行第一个任务并加入结果集合
--ntasks;//待提交任务会减少
int active = 1;//正在进行的任务会增加
for (;;) {//接下来进入自旋
Future<T> f = ecs.poll();//这里会返回最先完成的任务
if (f == null) {//如果为空
if (ntasks > 0) {//而且还有任务可以提交,则进行任务的提交
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)//如果没有任务可以提交,而且所有提交的任务都已经完成,则跳出自旋
break;
else if (timed) {//如果没有任务可以提交,而且需要考虑超时时间,则在一定时间内等待有任务完成
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)//超时则抛出超时异常
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;//获得任务后进行超时时间的计算
}
else//如果不需要考虑超时时间,则阻塞的等待第一个任务完成
f = ecs.take();
}
if (f != null) {//当有任务完成时
--active;//正在进行的任务数会减少
try {
return f.get();//如果能顺利获得结果,则直接返回,如果出现任何异常,则进入自旋,尝试等待下一个任务完成
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//当所有任务都执行完成,且没有任何任务成功时,会抛出异常
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {//如果某一个任务成功执行完毕,则取消所有任务
for (Future<T> f : futures)
f.cancel(true);
}
}
以上就是线程池的批量执行方法的分析。
创建了Worker就需要执行,接下来看看Worker是如何执行的:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
...
}
Worker本身采用独占模式继承了AbstractQueuedSynchronizer,实现了Runnable接口。每个Worker对象又会包装一个thread,所以每个worker就变成了可重用的,而且又是独占式的。runWorker方法如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法会持续不断的从Queue中获得任务并执行,runWorker有以下几点需要注意:
- 可以看到: Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { ... } worker会首先尝试获得firstTask,如果firstTask不存在也没关系,worker会持续不断的通过getTask方法从Queue中获得下一个任务。如果getTask方法没有获得任何任务,worker会自动推出并执行processWorkerExit方法。
- 每次执行任务前会加锁,并且通过Thread.interrupted()清楚中断状态,保证只要线程池不关闭,任务肯定会继续执行。
- 任务执行前后会执行扩展方法beforeExecute(wt, task)和afterExecute(task, thrown),值得注意的是beforeExecute执行过程中抛出的异常不会被catch住,所以beforeExecute如果有异常抛出,任务不会执行。是所有RuntimeException会交给afterExecute处理最后抛出。值得注意的是由于Error不会强制被catch的特殊性,任何其他Throwable都会封装成Error,和Error一样在afterExecute处理过后抛出。
还记得线程池里设置了每个线程的存活时间吗,来看看getTask方法,具体逻辑解释以注释方式给出:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//每次尝试获取任务之前
//都首先会判断一下是否需要获取任务,如果当前线程池已经开始关闭,或者队列为空就不用获取了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//这时设置worker数减1,这个线程就要关闭了
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池允许线程 timeout或者当前线程数大于核心线程数,则会进行timeout的处理
//如果线程数小于最大线程数,并且不需要做timeout判断则直接跳出
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))//否则,也就是已经timeout了,通过cas削减worker数,并返回null意味着线程即将关闭
return null;
//如果cas削减worker数失败
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//double check一下当前状态是不是被其他线程改动过
continue retry;//如果改动过,则重新执行以上逻辑
//如果状态没改动过,cas设置失败应该是由于worker数被其他线程修改过,在内部自旋重新判断
}
//如果确实需要获取任务
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//成功获取后返回
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
有开始就有结束,接下来看看worker怎么终结自己的,当runWorker跳出while循环后会执行finally中的processWorkerExit:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
在worker关闭之前首先要判断completedAbruptly,这个参数为true,则表明worker是由于执行任务时抛出异常才无奈关闭,如果为false,则是由于没有新的任务可以获取,空闲时间太长而关闭。因为空闲而关闭在getTask方法中已经做了worker数削减的操作了,所以在这里不再削减。
这之后会在锁内完成从workers中删除worker的操作,接下来在有可能终止线程池的地方尝试终止。最后要根据当前线程池worker的数量以及核心线程数,Queue中剩余任务数来决定是否创建新的worker。
worker从创建,工作,到销毁的整个生命周期就分析完了。
再来看看线程池是如何关闭的:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdown方法是优雅关闭的方法,所有已经提交的任务会正常执行,关闭之后提交的任务不会被接受。可以看到所有关闭操作都是在mainLock中进行的。
首先会通过SecurityManager来判断是否允许checkShutdownAccess。如果允许或是SecurityManager没有设置。advanceRunState方法如下:
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
这里会自旋判断当前状态,并通过CAS设置状态为关闭,如果有其他线程同时改变了状态导致设置状态失败,则会进入自旋,直到状态设置成功为止。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
之后调用interruptIdleWorkers方法中断所有空闲的线程。因为每一个worker都包含一个独占锁,worker执行任务时会持有这个独占锁,所以这里只会中断没有在工作的worker。
shutdown方法接下来会执行一个扩展点onShutdown,最后执行tryTerminate尝试关闭线程池。tryTerminate方法如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
这个方法进入自旋之后首先会判断当前状态是否可以终止,例如如果是running状态则无需执行终止。
之后的这段逻辑略显晦涩:
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
总的来说就是如果当前worker仍然存活,就尝试中断任何一个空闲的worker,这里只中断一个worker,为的是让这个worker在终止之前执行processWorkerExit,这样会再次调用tryTerminate方法,这样就把关闭的信息传播下去,达到优雅关闭的目的。
在tryTerminate的最后会在锁内用CAS把状态置为TIDYING,执行扩展方法terminated(),之后状态被设为TERMINATED,并signal所有等待线程池关闭的线程。在第一次执行CAS操作失败就会进入自旋,知道成功为止。
除了优雅关闭之外,还有强制关闭的方法:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
这个方法和优雅关闭shutdown方法前半段都差不多,只不过最后会中断所有worker,并把仍然在Queue中等待执行的任务打包返回给用户。
了解完了最基础的线程池之后,来看看各种各样具备特殊职能的线程池。
ScheduledThreadPoolExecutor
接下来关注一下ScheduledThreadPoolExecutor,这个线程池继承了ThreadPoolExecutor可以执行定时任务,也就是类似timer的作用不过由于是线程池,可以一次执行多个任务,节约线程资源。
一般不会直接使用ScheduledThreadPoolExecutor的构造函数,因为Executors工具类提供了更便捷的使用方式:
Executors.newSingleThreadScheduledExecutor();
Executors.newScheduledThreadPool(1);
这个方法创建了一个
class Executors...
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
由于声明的是ScheduledExecutorService接口,这样就不会暴露出线程池自身的一些高级方法。而且,作为处理定时任务的线程池,任务数一般不会太多,所以没有必要使用一个以上的线程。
ScheduledThreadPoolExecutor提供了4个构造函数,分别可以设定corePoolSize,threadFactory,rejectedExecutionHandler。唯独不能设定阻塞队列,因为ScheduledThreadPoolExecutor必须使用阻塞队列DelayedWorkQueue,不过有一点不同导致他没有继承DelayQueue,而是重写一个,因为定时任务的业务逻辑导致不得不在堆中记录每个任务的index。这样做有以下几点好处:
- 当取消任务时把查找的时间复杂度从O(n)降为O(log n)
- 当任务在堆内移动时提升了gc效果
尝试向线程池提交一个任务,我们可以选择延迟一段时间再执行:
pool.schedule(new Callable()[T]{
...
}, 10L, TimeUnit.SECONDS)
这个方法实现如下:
class ScheduledThreadPoolExecutor...
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null)//判断非空
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//使用callable和触发任务执行时间构造RunnableScheduledFuture对象
delayedExecute(t);//延迟执行
return t;
}
重点看看delayedExecute方法:
class ScheduledThreadPoolExecutor...
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) //如果关闭,则使用handler拒绝任务
reject(task);
else { //如果一切正常
super.getQueue().add(task); //把task添加到延迟队列
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) && remove(task))//再次判断如果已经shutdown,如果不需要在关闭后仍然执行周期任务,则把任务移除
task.cancel(false);//任务取消
else //如果一切正常,保证线程池存在至少一个线程
ensurePrestart();
}
}