线程池原理

线程池原理

线程池是什么

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。
线程池(Thread Pool)是一种基于池化思想管理线程的工具,线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

优点
  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

创建线程池

  1. 使用Executors提供的一些工厂方法来快速创建ThreadPoolExecutor实例。但是不推荐,各个都有缺点。
    使用Executors.newCachedThreadPool可以快速创建一个拥有自动回收线程功能且没有限制的线程池。
    使用Executors.newFixedThreadPool可以用来创建一个固定线程大小的线程池。
    使用Executors.newSingleThreadExecutor可以用来创建一个单线程的执行器。
  2. 通过构造参数创建ThreadPoolExecutor实例。(推荐,可以设置合理的核心线程数,最大线程数,队列,也可以熟悉线程池的原理)。
    除了通过构造参数设置这几个线程池参数之外我们还可以在运行时设置。
    ThreadPoolExecutor提供了这几个参数的getset方法。

ThreadPoolExecutor还提供了protected类型得可以被覆盖的方法,允许用户在任务执行之前或执行之后做一些事情。我们可以通过它来实现比如初始化ThreadLocal、收集统计信息、如记录日志等操作,比如

1
2
3
/* Extension hooks */
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

另外还有一个Hook可以用来在任务被执行完的时候让用户插入逻辑:

1
protected void terminated() { }

线程池核心设计与实现

设计

Java中的线程池核心实现类是ThreadPoolExecutor
![ThreadPoolExecutor UML类图](/images/JUC/ThreadPoolExecutor/ThreadPoolExecutor UML类图.png)
ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。其运行机制如下图所示:
ThreadPoolExecutor运行流程
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

实现
生命周期

线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。类似于读写锁ReentrantReadWriteLock维护读、写锁数量分别使用高16位(读)和低16位(写)。

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

1
2
3
private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种:

1
2
3
4
5
6
//runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//能接受新提交的任务,并且能处理阻���队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;//关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
private static final int STOP = 1 << COUNT_BITS;//不再接受新任务,也不处理队列中的任务,会中断正在处理任务的线程
private static final int TIDYING = 2 << COUNT_BITS;//所有线程都已终止,workerCount(有效线程数)为0
private static final int TERMINATED = 3 << COUNT_BITS;//在terminated()方法执行完后进入该状态

线程池生命周期

任务执行机制
  1. 任务调度
    任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。
    首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
    a. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。>RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
    b. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    c. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
    d. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
    e. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
    任务调度流程
  2. 任务缓冲
    线程池的本质是对任务和线程的管理,在线程池中是以生产者消费者模式实现的,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
    使用不同的队列可以实现不一样的任务存取策略:
    各种队列实现的任务策略
  3. 任务申请
    根据线程调度可知,任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
    线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现。

获取任务流程图

在不考虑异常的场景下,返回null,就表示退出循环,结束线程:

1、线程池的状态已经是STOP(1),TIDYING(2), TERMINATED(3),或者是SHUTDOWN(0)且工作队列为空,allowCoreThreadTimeOut默认值是false,即核心线程不会超时不会被回收(可自行设置),所以线程池一直是RUNNING状态。

2、工作线程数已经大于最大线程数或者当前工作线程已超时,并且还有其他工作线程或任务队列为空.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池的状态已经是STOP(1),TIDYING(2), TERMINATED(3),或者是SHUTDOWN(0)且工作队列为空.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut默认为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 工作线程数已经大于最大线程数或者当前工作线程已超时,并且还有其他工作线程或任务队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 此处CAS保证了安全,比方allowCoreThreadTimeOut为false的情况下,多条空闲线程到这导致核心线程比定义的少。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  1. 任务拒绝(拒绝策略)
    任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
    拒绝策略是一个接口:
    1
    2
    3
    public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    用户可以通过实现这个接口去定制拒绝策略,线程池本身提供了四种拒绝策略:
    四种拒绝策略
Worker线程管理
  1. Worker线程
    线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

    1
    2
    3
    4
     private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null
    }

    Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
    Worker执行任务
    线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
    Worker是通过继承AQS实现独占锁这个功能,没有使用可重入锁ReentrantLock。使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
    a. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
    b. 如果正在执行任务,则不应该中断线程。
    c. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
    d. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

  2. Worker线程增加
    增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的(任务管理),该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。
    addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
    申请线程执行任务

  3. Worker线程回收
    线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

    1
    2
    3
    4
    5
    6
    7
    try {
    while (task != null || (task = getTask()) != null) {
    //执行任务
    }
    } finally {
    processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
    }

    事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

  4. Worker线程执行任务
    Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
    a. while循环不断地通过getTask()方法获取任务。
    b. getTask()方法从阻塞队列中取任务。
    c. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。

d. 执行任务。

e. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
执行任务流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

业务应用

线程池构造参数有7个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

动态设置

设置核心线程数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}

在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。

对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel(闲置的,空闲的)的时候也会被回收;

对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务。

设置核心线程数

设置最大线程数
1
2
3
4
5
6
7
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

1.首先是参数合法性校验。

2.然后用传递进来的值,覆盖原来的值。

3.判断工作线程是否是大于最大线程数,如果大于,则对空闲线程发起中断请求。

  1. 如何设置

    设置核心线程数的时候,同时设置最大线程数即可。其实可以把二者设置为相同的值。

    原因:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    //如果工作线程数大于最大线程数,则对工作线程数量进行减一操作,然后返回 null。
    if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
    return null;
    continue;
    }

    try {
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
    if (r != null)
    return r;
    timedOut = true;
    } catch (InterruptedException retry) {
    timedOut = false;
    }
    }
    }

    这个地方的实际流程应该是: 创建新的工作线程 worker,然后工作线程数进行加一操作。 运行创建的工作线程 worker,开始获取任务 task。 工作线程数量大于最大线程数,对工作线程数进行减一操作。 返回 null,即没有获取到 task。 清理该任务,流程结束。

  2. 如果调整之后把活动线程数设置的值太大了,岂不是业务低峰期我们还需要人工把值调的小一点?

    答案是不存在

    1
    2
    * @param corePoolSize the number of threads to keep in the pool, even
    * if they are idle, unless {@code allowCoreThreadTimeOut} is set

    allowCoreThreadTimeOut 参数设置为 true 的时候,核心线程在空闲了keepAliveTime 的时间后也会被回收的,相当于线程池自动给你动态修改了。

动态设置队列长度(看的美团的线程池设置,他们用的队列是LinkedBlockingQueue)

并没有设置队列长度的 set 方法啊…源码:

1
2
3
//队列的 capacity 是被 final 修饰了
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

想要设置可以自定义队列(比如:自定义队列内容完全和linkedBlockingQueue一样,只是Capacity的final给去掉,并提供get和set)

其他问题:

a. 线程池被创建后里面有线程吗?如果没有的话,你知道有什么方法对线程池进行预热吗?

线程池被创建后如果没有任务过来,里面是不会有线程的。如果需要预热的话可以调用下面的两个方法:

全部启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}

启动一个:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}

b. 核心线程数会被回收吗?需要什么设置?

核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;

public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}

线程池原理
http://www.muzili.ren/2022/06/11/ThreadPoolExecutor/
作者
jievhaha
发布于
2022年6月11日
许可协议