bigbully +

我所知道的线程池

我所知道的线程池

线程池其实或多或少都用过,不过这是我第一次阅读它的源码,包括源码附带的非常详尽的注释。发现我之前对于线程池的理解还是很浅薄的。

其实从ThreadPoolExecutor.java顶部200多行的注释就能一定程度上了解线程池的用法了。

首先看一下线程池的初始化方式:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,long keepAliveTime,                   TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) 

参数说明 - corePoolSize 核心线程数:提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。 - maximumPoolSize 最大线程数:线程池允许存活的线程数的最大值。核心线程数不能超过最大线程数。 - keepAliveTime 非核心线程数的存活时间:如果核心线程数小于最大线程数,那么核心线程之外的那些线程当存活超过keepAliveTime,就会被终止。 - unit 存活时间的单位 - workQueue 线程池使用的队列 - threadFactory 线程工厂 - handler 当线程池拒绝提交新的任务时使用的策略。

默认构造方法的参数着实不少,一开始我根本无法理解为什么需要这么多初始化参数,不过在源码的注释中,针对不同的场景如何设置线程池参数的示例。

首先需要了解的是corePoolSize,maximumPoolSize,workQueue,handler四者之间的关系。

image

依照这个关系图,在源码注释中给出了3种推荐策略:

推荐策略

  • 直接的任务传递:最大线程数设为Integer.MAX_VALUE,workQueue使用SynchronousQueue,因为SynchronousQueue自身特性不保留任何元素,所以当核心线程都在使用时,任何提交的任务都会创建新的非核心线程。这种方式任务处理的吞吐量最大,不过会消耗更多地资源。
  • 无界队列:可以使用LinkedBlockingQueue这种无界队列,这时最大线程数这个参数不再有意义。当任务提交频率较平缓时,且所有提交的任务之间都彼此独立时使用这种策略。不过需要担心一下内存溢出的问题。
  • 有界队列:例如使用ArrayBlockingQueue,会根据队列的size和最大线程数来调整线程池对资源的使用和吞吐量的高低。

接下来从源码层面来研究一下线程池的实现。

image

ThreadPoolExecutor自身的层级结构如图所示。最上层的执行器Executor接口本身只具备void execute(Runnable command)的能力,而ExecutorService作为一个service又必须具备提交任务、正常关闭等职能,在AbstractExecutorService实现了一些公共方法。ThreadPoolExecutor作为线程池的基础实现,其下还有若干具备特殊功能的线程池继承自ThreadPoolExecutor。

不过这篇笔记只研究ThreadPoolExecutor的实现,其他不同类别的特殊线程池的学习放在之后进行。

ThreadPoolExecutor中也涉及到一些和AbstractQueuedSynchronizer相关的操作,AbstractQueuedSynchronizer及其各种实现类可以我在并发的灵魂中有详尽的解析,在此只简单描述它的功能。

作为一个Executor的实现类,自然要从execute方法说起:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//如果工作线程数小于核心线程数
        if (addWorker(command, true))//创建worker,第二个参数为true表示创建核心线程
            return;//创建成功则返回
        c = ctl.get();//创建失败获取保存的ctl
    }
    if (isRunning(c) && workQueue.offer(command)) {//判断ctl中的状态,如果线程池处于RUNNING状态,则把任务放入队列中
        int recheck = ctl.get();//重新确认状态
        if (! isRunning(recheck) && remove(command))//如果线程池已关闭,则移除任务
            reject(command);//移除成功调用RejectedExecutionHandler
        else if (workerCountOf(recheck) == 0)//如果remove失败
            addWorker(null, false);//自旋检查是否Q中所有任务都已经处理完成
    }//如果线程池已经关闭或队列已满
    else if (!addWorker(command, false))//尝试创建worker,并使用非核心线程
        reject(command);//创建失败调用RejectedExecutionHandler
}

这方法就涉及到ctl属性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

用一个支持并发的原子Int类型来保存两个属性:工作线程数和当前线程池状态。由于两个属性共享一个原子Int类型,所以线程池共支持(229)-1,大于5亿个线程。线程池的状态共分为一下五种:

以上五种状态从上至下一次发生状态改变,唯一的特例在于如果当前没有task可以处理,那么RUNNING状态会直接转变为STOP状态。

这个int类型共32位,保留最左3位用来保存5个状态值,右边29位保存工作线程数,所以以下几个方法(rs表示runState,wc表示workerCount)也就非常好理解了。

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

回到execute方法的源码中,从源码可以看出之前流程图表示的逻辑。详细流程这次在源码中以注释方式标明。

再来详细看一下addWorker方法,终于有机会在jdk源代码中见识一下java中的“goto”了!:

//方法接收两个参数,第一个参数是首个任务,可以为空,第二个参数表示是否创建在核心线程中
private boolean addWorker(Runnable firstTask, boolean core) {
    //以下是两层自旋,为了能从内层自旋直接跳到外层自旋,所以在这里设置retry标示
    retry:
    for (;;) {
        int c = ctl.get();//首先获得ctl的引用
        int rs = runStateOf(c);//计算出当前线程池的状态

        // Check if queue empty only if necessary.
        //以下需要判断如果线程池处于非开启状态,并且队列仍然未清空时会进行自旋
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;//如果线程池关闭后队列清空,则成功返回

        //这里是内层自旋
        for (;;) {
            int wc = workerCountOf(c);//首先得到当前worker的数量,也就是池中线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))//如果超标
                return false;//直接返回失败
            if (compareAndIncrementWorkerCount(c))//没超标则cas方式增加work数
                break retry;//成功后直接退出外层自旋
            c = ctl.get();  // Re-read ctl//如果失败的话需要reload ctl
            if (runStateOf(c) != rs)//如果当前状态已经改变,则重新自旋
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
            //如果状态没改变的话,也许有其他任务同时提交,内层自旋
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;//在增减Worker的时候使用锁而不使用并发容器官方的解释时,当线程池shutdown的时候为了避免线程中断的高峰,加锁后可以变为串行执行
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);//double check

                if (rs < SHUTDOWN || //如果线程池正在运行
                    (rs == SHUTDOWN && firstTask == null)) { //或是shutdown后调用firstTask为null的时候
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)//largestPoolSize用来表示线程池中曾经到达过的最大线程数
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//线程启动在这里
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)//只有在线程池处于非运行状态下才会添加失败
            addWorkerFailed(w);//这时除了会从works中移除work之外,还会触发tryTerminate操作,tryTerminate会在任何发现当前线程池已经关闭的时候触发,不只在执行shutdown操作的时候触发。
    }
    return workerStarted;
}

addWorker方法就是这样。如果用过线程池就会发现,提交任务还有一种带返回值的方式,也就是submit,它是怎么做的呢?答案在AbstractExecutorService这个基类中。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}


public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

提供了三种submit方式,其实都是构造成FutureTask,因为FutureTask实现了Runnable接口,可以直接调用上文提到的execute方法执行,最终把FutureTask对象返回给用户即可。

线程池还提供了invokeAll和invokeAny方法,用来批量提交任务,两个方法都是阻塞执行的,区别在于,invokeAll方法只有当所有任务都执行完之后才返回结果集,而invokeAny方法只要有一个任务执行完成了,就把结果返回,并取消其他未执行完成的任务。invokeAll和invokeAny都提供阻塞和带超时时间的方法。如下所示:

 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

下面来分别看看这两种方法是如何实现的。invokeAll方法:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true);
    }
}

这方法几乎不用解释,只是按提交任务的顺序,循环遍历任务列表等待任务结果,这过程中不响应任何异常,除非任务被取消,当有任务被取消时所有任务都依次被取消,抛出InterruptedException异常。

带超时时间的invokeAll方法其实也很简单,只不过在每次提交任务判断是否已经超时,获得futureTask结果时使用 f.get(nanos, TimeUnit.NANOSECONDS),当出现超时异常时取消所有任务,并返回结果集。

invokeAny是否包含超时时间的两个方法实际上都调用的时下面这个doInvokeAny方法,这个方法我以注释的方式加以分析:

//第二个参数用来标识是否有超时时间
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                        boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();//这里会记录待提交任务数
    if (ntasks == 0)
        throw new IllegalArgumentException();
    List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);//创建一个用来记录任务结果的集合
    //这里构造了一个线程池的包装类,实现很简单,当任务执行完成后,会放入一个FIFO的队列,所以这个队里是以任务的完成先后来排序的,这一点非常重要
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        //这里记录任务运行失败的最后一次异常信息,如果所有任务都失败了,则会抛出这个异常。
        ExecutionException ee = null;
        long lastTime = timed ? System.nanoTime() : 0;
        Iterator<? extends Callable<T>> it = tasks.iterator();//获得所有任务的迭代器

        futures.add(ecs.submit(it.next()));//执行第一个任务并加入结果集合
        --ntasks;//待提交任务会减少
        int active = 1;//正在进行的任务会增加

        for (;;) {//接下来进入自旋
            Future<T> f = ecs.poll();//这里会返回最先完成的任务
            if (f == null) {//如果为空
                if (ntasks > 0) {//而且还有任务可以提交,则进行任务的提交
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)//如果没有任务可以提交,而且所有提交的任务都已经完成,则跳出自旋
                    break;
                else if (timed) {//如果没有任务可以提交,而且需要考虑超时时间,则在一定时间内等待有任务完成
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)//超时则抛出超时异常
                        throw new TimeoutException();
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;//获得任务后进行超时时间的计算
                }
                else//如果不需要考虑超时时间,则阻塞的等待第一个任务完成
                    f = ecs.take();
            }

            if (f != null) {//当有任务完成时
                --active;//正在进行的任务数会减少
                try {
                    return f.get();//如果能顺利获得结果,则直接返回,如果出现任何异常,则进入自旋,尝试等待下一个任务完成
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }
        //当所有任务都执行完成,且没有任何任务成功时,会抛出异常
        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {//如果某一个任务成功执行完毕,则取消所有任务
        for (Future<T> f : futures)
            f.cancel(true);
    }
}

以上就是线程池的批量执行方法的分析。

创建了Worker就需要执行,接下来看看Worker是如何执行的:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    final Thread thread;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    ...
}

Worker本身采用独占模式继承了AbstractQueuedSynchronizer,实现了Runnable接口。每个Worker对象又会包装一个thread,所以每个worker就变成了可重用的,而且又是独占式的。runWorker方法如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法会持续不断的从Queue中获得任务并执行,runWorker有以下几点需要注意:

  1. 可以看到: Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { ... } worker会首先尝试获得firstTask,如果firstTask不存在也没关系,worker会持续不断的通过getTask方法从Queue中获得下一个任务。如果getTask方法没有获得任何任务,worker会自动推出并执行processWorkerExit方法。
  2. 每次执行任务前会加锁,并且通过Thread.interrupted()清楚中断状态,保证只要线程池不关闭,任务肯定会继续执行。
  3. 任务执行前后会执行扩展方法beforeExecute(wt, task)和afterExecute(task, thrown),值得注意的是beforeExecute执行过程中抛出的异常不会被catch住,所以beforeExecute如果有异常抛出,任务不会执行。是所有RuntimeException会交给afterExecute处理最后抛出。值得注意的是由于Error不会强制被catch的特殊性,任何其他Throwable都会封装成Error,和Error一样在afterExecute处理过后抛出。

还记得线程池里设置了每个线程的存活时间吗,来看看getTask方法,具体逻辑解释以注释方式给出:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //每次尝试获取任务之前
        //都首先会判断一下是否需要获取任务,如果当前线程池已经开始关闭,或者队列为空就不用获取了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();//这时设置worker数减1,这个线程就要关闭了
            return null;
        }

        boolean timed;      // Are workers subject to culling?

        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池允许线程  timeout或者当前线程数大于核心线程数,则会进行timeout的处理

            //如果线程数小于最大线程数,并且不需要做timeout判断则直接跳出
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))//否则,也就是已经timeout了,通过cas削减worker数,并返回null意味着线程即将关闭
                return null;
            //如果cas削减worker数失败
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)//double check一下当前状态是不是被其他线程改动过
                continue retry;//如果改动过,则重新执行以上逻辑
            //如果状态没改动过,cas设置失败应该是由于worker数被其他线程修改过,在内部自旋重新判断
        }

        //如果确实需要获取任务
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)//成功获取后返回
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

有开始就有结束,接下来看看worker怎么终结自己的,当runWorker跳出while循环后会执行finally中的processWorkerExit:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

在worker关闭之前首先要判断completedAbruptly,这个参数为true,则表明worker是由于执行任务时抛出异常才无奈关闭,如果为false,则是由于没有新的任务可以获取,空闲时间太长而关闭。因为空闲而关闭在getTask方法中已经做了worker数削减的操作了,所以在这里不再削减。

这之后会在锁内完成从workers中删除worker的操作,接下来在有可能终止线程池的地方尝试终止。最后要根据当前线程池worker的数量以及核心线程数,Queue中剩余任务数来决定是否创建新的worker。

worker从创建,工作,到销毁的整个生命周期就分析完了。

再来看看线程池是如何关闭的:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdown方法是优雅关闭的方法,所有已经提交的任务会正常执行,关闭之后提交的任务不会被接受。可以看到所有关闭操作都是在mainLock中进行的。

首先会通过SecurityManager来判断是否允许checkShutdownAccess。如果允许或是SecurityManager没有设置。advanceRunState方法如下:

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

这里会自旋判断当前状态,并通过CAS设置状态为关闭,如果有其他线程同时改变了状态导致设置状态失败,则会进入自旋,直到状态设置成功为止。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

之后调用interruptIdleWorkers方法中断所有空闲的线程。因为每一个worker都包含一个独占锁,worker执行任务时会持有这个独占锁,所以这里只会中断没有在工作的worker。

shutdown方法接下来会执行一个扩展点onShutdown,最后执行tryTerminate尝试关闭线程池。tryTerminate方法如下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

这个方法进入自旋之后首先会判断当前状态是否可以终止,例如如果是running状态则无需执行终止。

之后的这段逻辑略显晦涩:

 if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

总的来说就是如果当前worker仍然存活,就尝试中断任何一个空闲的worker,这里只中断一个worker,为的是让这个worker在终止之前执行processWorkerExit,这样会再次调用tryTerminate方法,这样就把关闭的信息传播下去,达到优雅关闭的目的。

在tryTerminate的最后会在锁内用CAS把状态置为TIDYING,执行扩展方法terminated(),之后状态被设为TERMINATED,并signal所有等待线程池关闭的线程。在第一次执行CAS操作失败就会进入自旋,知道成功为止。

除了优雅关闭之外,还有强制关闭的方法:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

这个方法和优雅关闭shutdown方法前半段都差不多,只不过最后会中断所有worker,并把仍然在Queue中等待执行的任务打包返回给用户。

了解完了最基础的线程池之后,来看看各种各样具备特殊职能的线程池。

ScheduledThreadPoolExecutor

接下来关注一下ScheduledThreadPoolExecutor,这个线程池继承了ThreadPoolExecutor可以执行定时任务,也就是类似timer的作用不过由于是线程池,可以一次执行多个任务,节约线程资源。

一般不会直接使用ScheduledThreadPoolExecutor的构造函数,因为Executors工具类提供了更便捷的使用方式:

Executors.newSingleThreadScheduledExecutor();
Executors.newScheduledThreadPool(1);

这个方法创建了一个

class Executors...

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

static class DelegatedScheduledExecutorService
        extends DelegatedExecutorService
        implements ScheduledExecutorService {
    private final ScheduledExecutorService e;
    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        e = executor;
    }
    public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
        return e.schedule(command, delay, unit);
    }
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return e.schedule(callable, delay, unit);
    }
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
        return e.scheduleAtFixedRate(command, initialDelay, period, unit);
    }
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
        return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
}

由于声明的是ScheduledExecutorService接口,这样就不会暴露出线程池自身的一些高级方法。而且,作为处理定时任务的线程池,任务数一般不会太多,所以没有必要使用一个以上的线程。

ScheduledThreadPoolExecutor提供了4个构造函数,分别可以设定corePoolSize,threadFactory,rejectedExecutionHandler。唯独不能设定阻塞队列,因为ScheduledThreadPoolExecutor必须使用阻塞队列DelayedWorkQueue,不过有一点不同导致他没有继承DelayQueue,而是重写一个,因为定时任务的业务逻辑导致不得不在堆中记录每个任务的index。这样做有以下几点好处:

  1. 当取消任务时把查找的时间复杂度从O(n)降为O(log n)
  2. 当任务在堆内移动时提升了gc效果

尝试向线程池提交一个任务,我们可以选择延迟一段时间再执行:

pool.schedule(new Callable()[T]{
    ...
}, 10L, TimeUnit.SECONDS)

这个方法实现如下:

class ScheduledThreadPoolExecutor...

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    if (callable == null || unit == null)//判断非空
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//使用callable和触发任务执行时间构造RunnableScheduledFuture对象
    delayedExecute(t);//延迟执行
    return t;
}

重点看看delayedExecute方法:

class ScheduledThreadPoolExecutor...

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown()) //如果关闭,则使用handler拒绝任务
        reject(task);
    else { //如果一切正常
        super.getQueue().add(task); //把task添加到延迟队列
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) && remove(task))//再次判断如果已经shutdown,如果不需要在关闭后仍然执行周期任务,则把任务移除
            task.cancel(false);//任务取消
        else //如果一切正常,保证线程池存在至少一个线程
            ensurePrestart();
    }
}