0%

阻塞队列和线程池原理

前言

本文是介绍阻塞队列和线程池。

目录

一、阻塞队列

队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

阻塞队列

1)支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

2)支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。

为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

阻塞队列是实现BlockingQueue接口,接口主要方法如下:

  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(”Queuefull”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

注:阻塞队列的方法不全是阻塞的,只有put(e)take()方法会阻塞。

常用阻塞队列
  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
  • PriorityBlockingQueue(优先级阻塞队列):一个支持优先级排序的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。是用数组实现的。
  • DelayQueue(延迟阻塞队列):一个使用优先级队列实现的无界阻塞队列。是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。可运用于缓存系统的设计。是用数组实现的。
  • SynchronousQueue(同步阻塞队列):一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。是用链表实现的。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。多了transfertryTransfer方法transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者,必须等到消费者消费了才返回。tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。无论消费者是否接收,方法立即返回。
  • LinkedBlockingDeque(双向阻塞队列):一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。可设置容量防止其过度膨胀。双向阻塞队列可以运用在“工作窃取”模式中。

以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的,内部使用了ReentrantLock实现同步。

有界无界

有限队列就是长度有限,满了以后生产者会阻塞,无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,当然空间限制来源于系统资源的限制,如果处理不及时,导致队列越来越大越来越大,超出一定的限制致使内存超限,操作系统或者JVM帮你解决烦恼,直接把你 OOM kill 了。

无界也会阻塞,为何?因为阻塞不仅仅体现在生产者放入元素时会阻塞,消费者拿取元素时,如果没有元素,同样也会阻塞。

Array实现和Linked实现的区别
  1. 队列中锁的实现不同。ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock
  2. 在生产或消费时操作不同。ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会影响性能
  3. 队列大小初始化方式不同。ArrayBlockingQueue实现的队列中必须指定队列的大小;LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE。

二、线程池

为什么要用线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
Executor框架

Java中也有一套框架来控制管理线程,那就是Executor框架。Executor框架是JDK1.5之后才引入的,位于java.util.cocurrent 包下,可以通过该框架来控制线程的启动、执行和关闭,从而简化并发编程的操作,这是它的核心成员类图:

Executor:最上层的接口,定义了一个基本方法execute,接受一个Runnable参数,用来替代创建或启动线程的方法。

ExecutorService:继承自Executor,在其上做了一些shutdown()、submit()的扩展,提供了处理多线程的方法,可以说是真正的线程池接口。

ScheduledExecutorService:定时调度接口,支持延迟/或定期执行任务,继承自ExecutorService。

AbstractExecutorService:是执行框架的抽象类,实现了ExecutorService接口中的大部分方法;

ThreadPoolExecutor:是线程池中最核心的一个类,提供了线程池操作的基本方法,用来执行提交的任务。

ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,可以延迟或定期执行任务,比Timer更灵活,功能更强大。

Executors:线程池工厂类,可用于创建一系列有特定功能的线程池。

三、ThreadPoolExecutor详解

ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。几个重要的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值。方法runStateOf获取运行状态,workerCountOf获取活动线程数,ctlOf获取运行状态和活动线程数的值。

COUNT_BITS:值为29的常量,在字段CAPACITY被引用计算。

CAPACITY:表示有效线程数量(workerCount)的上限,大小为 (1<<29) - 1。

下面再介绍下线程池的运行状态. 线程池一共有五种状态, 分别是:

  1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;

  2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);

  3. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;

  4. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。

  5. TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。进入TERMINATED的条件如下:

    • 线程池不是RUNNING状态;

    • 线程池状态不是TIDYING状态或TERMINATED状态;

    • 如果线程池状态是SHUTDOWN并且workerQueue为空;

    • workerCount为0;

    • 设置TIDYING状态成功。

下图为线程池的状态转换过程:

threadpool-status

线程池各个参数含义

ThreadPoolExecutor构造函数

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

corePoolsize:线程池的基本大小(核心线程数),当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。

keepAliveTime:线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数在设置了参数allowCoreThreadTimeOut为true或者线程数大于corePoolSize时才有用,让空闲线程等待keepAliveTime时间之后会由于超时而退出。

unit:keepAliveTime的时间单位。

workQueue:存储等待执行的任务的阻塞队列,必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

threadFactory:创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。默认是Executors静态工厂里的DefaultThreadFactory,线程的命名规则是“pool-数字-thread-数字”。

handler:任务拒绝策略,当线程池已关闭(shutdown)或已达到其容量时所采用的处理策略。即当线程数达到最大线程数,且阻塞队列已满了,如果继续提交任务,会拒绝新任务,必须采取一种策略处理该任务,线程池提供了4种策略:

(1)AbortPolicy:直接抛出异常,默认策略;

(2)CallerRunsPolicy:用调用者所在的线程来执行任务;

(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的最旧任务,并执行当前任务;

(4)DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

注:corePoolsize:线程池中要保留的线程数,核心线程会一直存活,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut=true(默认false),核心线程才会超时关闭。

当线程数小于corePoolsize时,即使有线程空闲,也会优先创建新线程处理任务。

线程池的工作机制

1)如果当前运行的线程数 < corePoolSize,无论是否有空闲线程都会创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2)如果运行的线程数 >= corePoolSize且任务队列未满时,则将任务加入BlockingQueue。

3)如果运行的线程数 >= corePoolSize且任务队列已满时:分两种情况,

​ 3.1、运行的线程数 < maximumPoolSize时,则会新增线程来处理任务;

​ 3.2、运行的线程数 = maximumPoolSize时,意味着线程池的线程数已经达到了最大值,新任务会被拒绝,调用handler.rejectedExecution()方法执行任务拒绝策略。

execute方法

ThreadPoolExecutor类的核心调度方法是execute(),通过调用该方法可以向线程池提交一个不需要返回值的任务,交由线程池去执行。而ThreadPoolExecutor的工作逻辑也可以由这个方法来一步步理清。这是方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl的值,前面说了,该值记录着runState和workerCount
int c = ctl.get();
/*
* 调用workerCountOf得到当前活动的线程数;
* 当前活动线程数小于corePoolSize,新建一个线程放入线程池中;
* addWorker(): 把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//如果上面的添加线程操作失败,重新获取ctl值
c = ctl.get();
}
//如果当前线程池是运行状态,并且往工作队列中添加该任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/*
* 如果当前线程不是运行状态,把任务从队列中移除
* 调用reject(内部调用handler)拒绝接受任务
*/
if (! isRunning(recheck) && remove(command))
reject(command);
//获取线程池中的有效线程数,如果为0,则执行addWorker创建一个新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}

1、判断当前运行中的线程数是否小于corePoolSize,是的话则调用addWorker创建线程执行任务。

2、不满足1的条件,就把任务放入工作队列workQueue中。

3、如果任务成功加入workQueue,判断线程池是否是运行状态,不是的话先把任务移出工作队列,并调用reject方法,使用拒绝策略拒绝该任务。如果是运行状态但线程数为0,调用addWorker创建一个新线程。

4、如果放入workQueue失败 (队列已满),则调用addWorker创建线程执行任务,如果这时创建线程失败 (addWorker传进去的第二个参数值是false,说明这种情况是当前线程数不小于maximumPoolSize),就会调用reject()方法(内部调用handler)拒绝接受任务。

所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。整个执行流程用一张图片表示大致如下:

addWorker方法

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

/**线程池状态不为SHUTDOWN时
* 判断队列或者任务是否为空,是的话返回false
*/.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
/* 这里可以看出core参数决定着活动线程数的大小比较对象
* core为true表示与 corePoolSize大小进行比较
* core为false表示与 maximumPoolSize大小进行比较
* 当前活动线程数大于比较对象就返回false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker对象w
w = new Worker(firstTask);
//实例化w的线程t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet,保存着任务的worker对象
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

从代码中可以看出,addWorker方法的主要工作是在线程池中创建一个新的线程并执行,其中firstTask参数指定的是新线程需要执行的第一个任务,core参数决定于活动线程数的比较对象是corePoolSize还是maximumPoolSize。根据传进来的参数首先对线程池和队列的状态进行判断,满足条件就新建一个Worker对象,并实例化该对象的线程,最后启动线程。

Worker类

根据addWorker源码中的逻辑,我们可以发现,线程池中的每一个线程其实都是对应的Worker对象在维护的。从Worker类的构造函数可以看出,当实例化一个Worker对象时,Worker对象会把传进来的Runnable参数firstTask赋值给自己的同名属性,并且用线程工厂也就是当前的ThreadFactory来新建一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
}

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

1
2
3
4
5
6
7
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。

同时,因为Worker实现了Runnable接口,所以当Worker类中的线程启动时,调用的其实是run()方法。run方法中调用的是runWorker方法,我们来看下它的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
//允许中断
w.unlock(); // allow interrupts
//是否因为异常退出循环的标志,processWorkerExit方法会对该参数做判断
boolean completedAbruptly = true;
try {
//判断task是否为null,是的话通过getTask()从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/* 这里的判断主要逻辑是这样:
* 如果线程池正在停止,那么就确保当前线程是中断状态;
* 如果不是的话,就要保证不是中断状态
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//用于记录任务执行前需要做哪些事,属于ThreadPoolExecutor类中的方法,
//是空的,需要子类具体实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

总结一下runWorker方法的运行逻辑:

1、通过while循环不断地通过getTask()方法从队列中获取任务;

2、如果线程池正在停止状态,确保当前的线程是中断状态,否则确保当前线程不中断;

3、调用task的run()方法执行任务,执行完毕后需要置为null;

4、循环调用getTask()取不到任务了,跳出循环,执行processWorkerExit()方法。

看完runWorker()的运行流程,我们来看下getTask()是怎么实现的。

getTask方法

getTask()方法的作用是从队列中获取任务,下面是该方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private Runnable getTask() {
//记录上次从队列获取任务是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//将workerCount减1
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
/* timed变量用于判断线程的操作是否需要进行超时判断
* allowCoreThreadTimeOut不管它,默认是false
* wc > corePoolSize,当前线程是如果大于核心线程数corePoolSize
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
/* 根据timed变量判断,如果为true,调用workQueue的poll方法获取任务,
* 如果在keepAliveTime时间内没有获取到任务,则返回null;
* timed为false的话,就调用workQueue的take方法阻塞队列,
* 直到队列中有任务可取。
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//r为null,说明time为true,超时了,把timedOut也设置为true
timedOut = true;
} catch (InterruptedException retry) {
//发生异常,把timedOut也设置为false,重新跑循环
timedOut = false;
}
}
}

getTask的代码看上去比较简单,但其实内有乾坤,我们来重点分析一下两个if判断的逻辑:

1、当进入getTask方法后,先判断当前线程池状态,如果线程池状态rs >= SHUTDOWN,再进行以下判断:

1)rs 的状态是否大于STOP;2)队列是否为空;

满足以上条件其中之一,就将workerCount减1并返回null,也就是表示队列中不再有任务。因为线程池的状态值是SHUTDOWN以上时,队列中不再允许添加新任务,所以上面两个条件满足一个都说明队列中的任务都取完了。

2、进入第二个if判断,这里的逻辑有点绕,但作用比较重要,是为了控制线程池的有效线程数量,我们来具体解析下代码:

wc > maximumPoolSize:判断当前线程数是否大于maximumPoolSize,这种情况一般很少发生,除非是maximumPoolSize的大小在该程序执行的同时被进行设置,比如调用ThreadPoolExecutor中的setMaximumPoolSize方法。

timed && timedOut:如果为true,表示当前的操作需要进行超时判断,并且上次从队列获取任务已经超时。

wc > 1 || workQueue.isEmpty():如果工作线程大于1,或者阻塞队列是空的。

compareAndDecrementWorkerCount:比较并将线程池中的workerCount减1

在上文中,我们解析execute方法的逻辑时了解到,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,仍然可以增加工作线程。

但调用getTask()取任务的过程中,如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,也就是不断的让任务被取出,让线程数量保持在corePoolSize即可,直到getTask方法返回null。

而当getTask方法返回null后,runWorker方法中就会因为取不到任务而执行processWorkerExit()方法。

processWorkerExit方法

processWorkerExit方法的作用主要是对worker对象的移除,下面是方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//是异常退出的话,执行程序将workerCount数量减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers的集合中移除worker对象,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

至此,从execute方法开始的整个运行过程就完毕了,总结一下该流程:

执行execute –> 新建Worker对象,并实例化线程 –> 调用runWorker方法,通过getTask()获取任务,并执行run方法 –> getTask()方法中不断向队列取任务,并将workerCount数量减1,直至返回null –> 调用processWorkerExit清除worker对象。用一张流程图所示:

submit方法

用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的shutdownshutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别:

shutdown()将线程池的状态设置成SHUTDOWN状态,中断所有空闲的线程,正在执行的不会被中断。

shutdownNow()首先将线程池的状态设置成STOP状态,中断所有线程,包括正在执行或空闲的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

•任务的性质:CPU密集型任务、IO密集型任务和混合型任务。

•任务的优先级:高、中和低。

•任务的执行时间:长、中和短。

•任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。

CPU密集型任务(取数计算),应配置尽可能少的线程(计算过程中减少线程切换调度),应配置机器的CPU核心数+1个线程的线程池。

IO密集型任务(网络通信/读写磁盘等),线程并不是一直在执行任务,则应配置尽可能多的线程,如机器的CPU核心数*2。通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

总结

Java线程池核心ThreadPoolExecutor的使用和原理分析

深入理解Java线程池:ThreadPoolExecutor

Java并发编程:线程池的使用