1. 介绍

并发包提供了几种不同类型的线程池。今天我们来更加仔细地看看其实现原理。下面的分析适用于JDK1.7

2. 类分析

类的结构如下:

2.1 池分类

  1. 通过ThreadPoolExecutor创建的池: 例如Fixed pool 、Cached pool和singleThread等线程池。在1.8中还引入了WorkStealingPool这个基于ForkJoinPool里面窃取工作算法的线程池,可以关注下。
  2. 通过ScheduledThreadPoolExecutor创建的池
  3. ForkJoinPool:这个可以参考之前的文章总结:ForkJoinPool解读

3. ThreadPoolExecutor

3.1 方法签名

基于这个Executor创建的池是平常最常用的。所以我们来重点看下他的原理。首先看下他构造方法的签名:

根据源码里面的注释,我们来看看这些参数的含义:

  1. int corePoolSize:线程池能保留运行的线程数(即使为idle也会保留运行)
  2. int maximumPoolSize:线程池最大大小。如果需要创建的线程数量大于core size,但是小于max size,会尝试先放入工作队列等待调度。如果队列也满了则创建线程。
  3. long keepAliveTime: 大于core size的那部分线程如果处于空闲,能存活的最长时间
  4. TimeUnit unit: 保持活动时间单位
  5. BlockingQueue workQueue:任务被执行前,都会先加入该工作队列等待调度
  6. ThreadFactory threadFactory:线程工厂,用于创建新的线程
  7. RejectedExecutionHandler handler:拒绝执行处理器。当创建的线程数量超过max pool size就要使用不同的拒绝策略。

同时放下源码的解释,大家可以对照英文来理解:

3.2 ThreadPoolExecutor的execute方法执行过程

可以看看源码中执行的3步

3.3 关于BlockingQueue的作用

当线程池大小 >= corePoolSize 且 队列未满时,这时线程池使用者与线程池之间构成了一个生产者-消费者模型。线程池使用者生产任务,线程池消费任务,任务存储在BlockingQueue中,注意这里入队使用的是offer,当队列满的时候,直接返回false,而不会等待

关于BlockingQueue的分析见我另一篇文章:BlockingQueue浅析

4. 生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位来表示运行状态,分别是:

  1. RUNNING:接收新任务,并且处理任务队列中的任务
  2. SHUTDOWN:不接收新任务,但是处理任务队列的任务
  3. STOP:不接收新任务,不出来任务队列,同时中断所有进行中的任务
  4. TIDYING:所有任务已经被终止,工作线程数量为 0,到达该状态会执行terminated()
  5. TERMINATED:terminated()执行完毕

5. 线程池模型

5.1 几种模型

下面几种线程池只讨论Executors里面提供的。

  1. CachedThreadPool(采用SynchronousQueue作为工作队列):一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何的限制。适合许多生命周期较短的同步任务。“缓存”是因为设定了队列中任务的超时时间。这里之所以使用SynchronousQueue作为工作队列是很值得思考的。使用SynchronousQueue,等于新过来的任务由于没有take从队列取数据的线程,所以所有的put队列的操作都阻塞了,因此无法放数据到队列则直接创建了新的线程。
  2. FixedThreadPool(LinkedBlockingQueue作为工作队列):一个固定大小的线程池,提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的大小将不再变化。因为使用看了无界队列,永远不会触发产生新的线程,也不会触发拒绝任务。
  3. SingleThreadPool(LinkedBlockingQueue作为工作队列):一个单线程的线程池,它只有一个工作线程来执行任务,可以确保按照任务在队列中的顺序来串行执行,如果这个线程异常结束将创建一个新的线程来执行任务。实际上就是FixedThreadPool的一种特殊形式。
  4. ScheduledThreadPool(使用DelayedWorkQueue):一个固定大小的线程池,并且以延迟或者定时的方式来执行任务,类似于Timer。
  5. WorkingStealingPool(JDK1.8才有): 基于ForkJoinPool类实现工作窃取算法

5.2 使用 CachedThreadPool的注意点

  1. 默认实现除了CachedThreadPool,其他的线程池core size都是等于max size。CachedThreadPool的max size为Integer.MAX_VALUE
  2. 确保处理速度大于生产速度的情况下使用这个线程池(最好处理那种耗时较短的小任务),否则内存溢出。

6. 拒绝策略

达到MAX_SIZE之后队列满了,就要拒绝新任务了。主要以下几种方式:

  1. AbortPolicy:默认策略,终止任务,抛出RejectedException
  2. CallerRunsPolicy:在调用者线程执行当前任务,不抛异常
  3. DiscardPolicy: 抛弃策略,直接丢弃任务,不抛异常
  4. DiscardOldersPolicy:抛弃最老的任务,执行当前任务,不抛异常

6.1 使用CallerRunsPolicy

线程池使用CallerRunsPolicy作为其饱和策略. 如果线程池饱和时main线程仍然向线程池提交task, 那么task将在main中执行. main线程执行task是需要一定时间的, 这样就给了线程池喘息的机会, 而且main线程在执行task的时间内无法接受socket连接, 因此socket连接请求将缓存在tcp层. 如果server过载持续的时间较长, 使得tcp层的缓存不够, 那么tcp缓存将根据其策略丢弃部分请求. 如此一来, 整个系统的过载压力逐步向外扩散: 线程池-线程池中的队列-main线程-tcp层-client. 这样的系统在发生过载时是比较优雅的: 既不会因为过多的请求而导致系统资源耗尽, 也不会一发生过载时就拒绝服务, 只有发生长时间系统过载时才会出现客户端无法连接的情况.
PS: 注意下CallerRunsPolicy这个策略

参考资料:

  1. 再聊java线程池实现原理
  2. Java 线程池框架核心代码分析
  3. ThreadPoolExecutor线程池解析与BlockingQueue的三种实现