线程池是Java并发包中的重要部分,也是高并发程序必不可少的类库,但是线程池技术本身比较复杂,不同语言对其实现提供的抽象也不一样,所以本文以Java线程池为例,分析它的设计与实现,以及它所带我们的抽象。
序言
我对线程池的认识经历了以下三个阶段
1 会使用Executors的API,觉得很cool,很简单。
2 到配置线程池参数,理解线程池参数,池化资源复用,减少上下文切换,参数关系构成了线程池的执行过程。
3 任务,任务提交,任务执行的抽象理解,从ThreadPoolExecutor到ScheduledThreadPoolExecutor到ForkJoinPool,CompletableFuture的理解。
我现在的理解是:Java并发提供了三个核心抽象概念(任务,任务提交和取消,任务执行
),具体来说:
1 任务 任务的抽象从Runnable,Callable,FutureTask,到ForkJoinTask 子类RecursiveTask,RecursiveAction,以及CompletableFuture中的Completion对ForkJoinTask 的继承,对AsynchronousCompletionTask的实现。
2 任务提交和取消 从ExecutorService到ExecutorCompletionService,实现submit,invoke方法,核心子类:AbstractExecutorService作为骨架实现
3 任务执行 从Executor到核心子类ThreadPoolExecutor(核心方法execute),ForkjoinPool(因为重写了提交机制,所以核心方法submit和execute),ScheduledThreadPoolExecutor也是种执行机制。纯接口包含了命令模式,模板模式,状态机模式等等。这就意味着你可以自定义提交和执行机制。体现了多种策略和实现分别,非常漂亮。
传统的new Thread(new Runnable).start() 将任务,任务提交,任务执行耦合起来,也没有提供任务取消的机制,显得那么得不可用,这篇博文主要以分析ThreadPoolExecutor为主,但是站在更高的抽象层次去看,会理解更深。
结构
任务结构
每个任务都有其抽象的含义,接下来我们将分析每一个接口的类型。
1 | //代表了任务执行没有结果 |
1 | //代表了一个任务执行有结果 |
1 | //任务不仅仅被执行,还可以取消,完成,返回结果,Future对任务的抽象比Runnable更加全面,要知道通过原生Thread API |
1 | //接口多继承,仅仅是将Runnable和Future的能力结合起来,是一个mixin接口,但是还是强调了run的能力 |
1 | //真正的任务实现是FutureTask,FutureTask的构造对Callable和Runnable进行包装,使得任务成为FutureTask |
1 | //ForkJoinTask也是一种Future类型任务,其内部提供了AdaptedRunnable,AdaptedCallable的适配类, |
从上面可以看出,在JUC中,对于任务的抽象其实和任务的执行策略有关系,ThreadPoolExecutor执行的是FutureTask任务,而ScheduledThreadPoolExecutor执行的是ScheduledFutureTask,ForkJoinPool执行的是ForkJoinTask任务,这是多么清晰且统一的设计啊!
任务提交和执行结构
1 | //顶级接口,定义了任务执行,每一个任务是一个Runnable |
但是仅仅有执行还不行,还要管理任务的取消和生命周期,所以提供了ExecutorService接口,如果说Executor定义了任务执行,
那么ExecutorService提供提交定义了任务的提交和取消,提供了更加完整的任务生命周期的概念,注意到在这层抽象上,我们其实并不知道具体任务是怎么执行的(并行?串行?定期),怎么被提交的,以及怎么返回结果的,真正的实现是具体的实现类。
1 | //Executor提供执行机制,ExecutorService提供提交,取消,完成,等待完成,批量执行任务机制,其中最核心的抽象的提交机制。 |
可以看出,在抽象层,通过一系列接口来完成“任务,任务执行,任务提交和取消”等机制,而接下来章节将分析一种提交和执行机制,线程池,也就是ThreadPoolExecutor.
设计与实现
ThrealPoolExecutor整体结构
AbstractExecutorService实现
AbstractExecutorService仅仅为任务提交提供了骨架的实现,并没有为任务执行和取消提供实现,这也是面向接口设计的一个常用技巧,该类并没有实现Executor的execute方法,因为执行机制属于子类,我们其实可以提供默认实现。但是这样抽象类存在的价值将不是很大。
我们来看一下他的提交机制有哪些?
1 | //将任务包装成RunnableFuture,实际子类是FutureTask,然后子类(其实就是ThreadPoolExecutor)实现execute执行任务,最后返回执行后的任务 |
1 | //提交一个FutureTask,子类执行任务 |
1 | public <T> Future<T> submit(Callable<T> task) { |
1 | //提交一组任务,并且返回所有的任务返回值 |
FutureTask实现
任务执行
该方法实现RunnableFuture,而RunnableFuture接口继承Runnable的run方法,所有本质是任务执行时候的方法。
1 | public void run() {} |
获取任务结果
1 | public V get() throws InterruptedException, ExecutionException {} |
获取有限时间任务结果
1 | public V get(long timeout, TimeUnit unit) |
任务取消
1 | public boolean cancel(boolean mayInterruptIfRunning) {} |
ThreadPoolExecutor API
ThreadPoolExecutor 公共API较多,但是每一个都很实用。
我们主要分析和Executor和ExecutorService相关的API
1 | public void execute(Runnable command) {} |
核心构造函数:
1 | public ThreadPoolExecutor(int corePoolSize, |
ThreadPoolExecutor实现
ThreadPoolExecutor实现了线程池这种执行任务的机制,所以最核心的方法就是execute,如提交相关的方法,在其父类AbstractExecutorService已经实现了,所以该类其实就是实现了任务执行机制execute.
execute实现提供的抽象概念有,Worker和WorkQueue . Worker主要处理任务,每一个Worker是一个运行的线程,在runWoker方法中一直轮询WorkQueue的任务并执行,WorkQueue主要用于存储任务。
公共API-execute
1 | public void execute(Runnable command) { |
私有方法-addWorker
添加worker,并且启动worker,开始执行任务。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
私有方法-addWorkerFailed
1 | private void addWorkerFailed(Worker w) { |
私有非静态成员类-Worker
Worker即是锁(extends AbstractQueuedSynchronizer),也是一个工作者线程(implements Runnable),
1 | //这是一个互斥锁,且不支持重入!一个只能锁定一个任务,一个任务也只能被一个Worker锁住! |
私有方法runWorker
worker处理task的核心方法,从队列中不停地拿任务。
1 | final void runWorker(Worker w) { |
私有方法-processWorkerExit
该方法用户处理Worker因为异常情况退出,比如任务抛出异常,或者Worker被中断了
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
私有方法-getTask
1 | private Runnable getTask() { |
私有方法-interruptIdleWorkers
1 | private void interruptIdleWorkers(boolean onlyOne) { |
公共API-shutdown
1 | public void shutdown() { |
公共API-shutdownNow
1 | public List<Runnable> shutdownNow() { |
公共API-allowCoreThreadTimeOut
1 | public void allowCoreThreadTimeOut(boolean value) { |
公共API-prestartAllCoreThreads
1 | public int prestartAllCoreThreads() { |
工具方法
1 | private static int runStateOf(int c) { return c & ~CAPACITY; } |
静态字段
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
Executors实现
Executors是对执行者的静态工厂类,提供了常用的执行策略,并且提供了对任务的包装。
实战案例
tomcat线程池解读
org.apache.tomcat.util.threads.ThreadPoolExecutor
扩展ThreadPoolExecutor
多元化的拒绝策略
Apache HttpComponents Worker
1 | WorkerPoolExecutor |