java线程池使用最全详解
# 线程的创建方式
1、继承Thread,重写run方法
public class MyThread extends Thread {
@Override
public void run() {
for (int x = 0; x < 200; x++) {
System.out.println(x);
}
}
}
public class MyThreadDemo {
public static void main(String[] args) {
// 创建两个线程对象
MyThread my1 = new MyThread();
MyThread my2 = new MyThread();
my1.start();
my2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2、实现Runnable接口,重写run方法
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int x = 0; x < 100; x++) {
System.out.println(x);
}
}
}
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int x = 0; x < 100; x++) {
System.out.println(x);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
上面两种创建线程的方法,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口。
3、实现Callable接口
public static void main(String[] args) throws Exception {
FutureTask[] randomNumberTasks = new FutureTask[5];
for (int i = 0; i < 5; i++) {
Callable callable = new CallableExample();
randomNumberTasks[i] = new FutureTask(callable);
Thread t = new Thread(randomNumberTasks[i]);
t.start();
}
for (int i = 0; i < 5; i++) {
System.out.println(randomNumberTasks[i].get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 为什么要用线程池?
“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控。
在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:
如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行 任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立 即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线 程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以 采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时 间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的 启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时, 不会有 T1,T3 的开销了。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会 消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和 监控。
假设一个服务器一天要处理 50000 个请求,并且每个请求需要一个单独的线 程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为 50000。 一般线程池大小是远小于 50000。所以利用线程池的服务器程序不会为了创建 50000 而在处理请求时浪费时间,从而提高效率。
# Executor框架
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法,它将任务的提交与任务的 执行分离开来。
ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit() 的扩展,可以说是真正的线程池接口;
AbstractExecutorService 抽象类实x现了 ExecutorService 接口中的大部分方法;
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期 执行"功能 ExecutorService;
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令, 或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
下图为它的继承与实现
从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为
1、execute(Runnable command):履行Ruannable类型的任务,
2、submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future 对象
3、shutdown():在完成已提交的任务后封闭办事,不再接管新任务,
4、shutdownNow():停止所有正在履行的任务并封闭办事。
5、isTerminated():测试是否所有任务都履行完毕了。
6、isShutdown():测试是否该ExecutorService已被关闭。
# ThreadPoolExecutor
线程池的创建各个参数含义
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
2
3
4
5
6
7
8
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任 务,直到当前线程数等于 corePoolSize;
如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启 动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则 创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认 情况下,该参数只在线程数大于 corePoolSize 时才有用
TimeUnit
keepAliveTime 的时间单位
workQueue
workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的 corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能,用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待, 因此线程池中的线程数不会超过 corePoolSize。
2)由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数。
3)由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。
4)更重要的,使用无界 queue 可能会耗尽系统资源,有界队列则有助于防 止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、 SynchronousQueue、PriorityBlockingQueue。
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具 有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
Executors 静态工厂里默认的 threadFactory,线程的命名规则是“pool-数字 -thread-数字”。
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
2
3
4
5
6
7
8
RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务; 一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和 策略,如记录日志或持久化存储不能处理的任务。
线程池重点属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
2
3
ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
ctl相关方法
// 获取运行状态;
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取活动线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
4
5
6
线程池存在5种状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
2
3
4
5
6
1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在
ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -
> TERMINATED。
进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
# 线程池的工作机制
1)如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意, 执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
3)如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处 理任务。
4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
提交任务
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程 池执行成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个 future 类型的 对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get (long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
关闭线程池
可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们 的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别, shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行 或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的 状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法 会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任 务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完, 则可以调用 shutdownNow 方法。
# 线程池监控
1、线程池已执行和未执行的任务总数
public long getTaskCount()
2、已完成的任务数量
public long getCompletedTaskCount()
3、线程池当前线程数
public int getPoolSize()
4、线程池中执行任务的线程数量
public int getActiveCount()
# 预定义线程池
FixedThreadPool 详解
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2
3
4
5
创建使用固定线程数的 FixedThreadPool 的 API。适用于为了满足资源管理的 需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads。
当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程 等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把 keepAliveTime 设 置为 0L,意味着多余的空闲线程会被立即终止。 FixedThreadPool 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列 (队列的容量为 Integer.MAX_VALUE)。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2
3
4
5
6
7
创建使用单个线程的 SingleThread-Executor 的 API,于需要保证顺序地执行 各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
corePoolSize 和 maximumPoolSize 被设置为 1。其他参数与 FixedThreadPool 相同。SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工 作队列(队列的容量为 Integer.MAX_VALUE)。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2
3
4
5
创建一个会根据需要创建新线程的 CachedThreadPool 的 API。大小无界的线 程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。 corePoolSize 被设置为 0,即 corePool 为空;maximumPoolSize 被设置为 Integer.MAX_VALUE。这里把 keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
FixedThreadPool 和 SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列。CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时, CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创 建过多线程而耗尽 CPU 和内存资源。
WorkStealingPool
利用所有运行的处理器数目来创建一个工作窃取的线程池,使用 forkjoin 实 现。
ScheduledThreadPoolExecutor
使用工厂类 Executors 来创建。Executors 可以创建 2 种类型的 ScheduledThreadPoolExecutor,如下:
- ScheduledThreadPoolExecutor。包含若干个线程的 ScheduledThreadPoolExecutor。
- SingleThreadScheduledExecutor。只包含一个线程的 ScheduledThreadPoolExecutor。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
2
3
4
5
6
7
8
9
ScheduledThreadPoolExecutor 适用于需要多个后台线程执行周期任务,同时 为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
SingleThreadScheduledExecutor 适用于需要单个后台线程执行周期任务,同 时需要保证顺序地执行各个任务的应用场景。
提交定时任务
//向定时任务线程池提交一个延时 Runnable 任务(仅执行一次)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
2
//向定时任务线程池提交一个延时的 Callable 任务(仅执行一次)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
2
//向定时任务线程池提交一个固定时间间隔执行的任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
2
//向定时任务线程池提交一个固定延时间隔执行的任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
2
固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从 理论上讲是确定的,当然执行任务的时间不能超过执行周期。
固定延时间隔的任务是指每次执行完任务以后都延时一个固定的时间。由于 操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间是不确定的,也就导致了每次任务的执行周期存在一定的波动。
合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
- 任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
- 任务的优先级:高、中和低。
- 任务的执行时间:长、中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。
CPU密集型任务也叫计算密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。
IO密集型任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*Ncpu。因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在工作队列中等待的任务就会减少,可以更好地利用资源。
混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐 量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU 个数。
对于 IO 型的任务的最佳线程数,有个公式可以计算Nthreads = NCPU * UCPU * (1 + W/C)
其中:
- NCPU 是处理器的核的数目
- UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)
- W/C 是等待时间与工作时间的比率
通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。
太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。
等待时间与计算时间我们在 Linux 下使用相关的 vmstat 命令或者 top 命令查看。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果, 等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
假设,我们现在有一个 Web 系统,里面使用了线程池来处理业务,在某些情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。 如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
# 线程池使用
public static void main(String[] args) throws InterruptedException {
Long start = System.currentTimeMillis();
final Random random = new Random();
final List<Integer> list = new ArrayList<Integer>();
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100000; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
list.add(random.nextInt());
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("时间:"+(System.currentTimeMillis() - start));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# CompletionService
CompletionService 实际上可以看做是 Executor 和 BlockingQueue 的结合体。
CompletionService原理
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
CompletionService 在接收到要执行的任务时,通过类似 BlockingQueue 的 put 和 take 获得任务执行的结果。
CompletionService 的一个实现是 ExecutorCompletionService, ExecutorCompletionService 把具体的计算任务交给 Executor 完成。
在实现上,ExecutorCompletionService 在构造函数中会创建一个 BlockingQueue(使用的基于链表的 LinkedBlockingQueue),该 BlockingQueue 的作用是保存 Executor 执行的结果。 当提交一个任务到 ExecutorCompletionService 时,首先将任务包装成 QueueingFuture,它是 FutureTask 的一个子类,然后改写 FutureTask 的 done 方 法,之后把 Executor 执行的计算结果放入 BlockingQueue 中。 与 ExecutorService 最主要的区别在于 submit 的 task 不一定是按照加入时的 顺序完成的。CompletionService 对 ExecutorService 进行了包装,内部维护一个保 存 Future 对象的 BlockingQueue。只有当这个 Future 对象状态是结束的时候,才 会加入到这个 Queue 中,take()方法其实就是 Producer-Consumer 中的 Consumer。 它会从 Queue 中取出 Future 对象,如果 Queue 是空的,就会阻塞在那里,直到 有完成的 Future 对象加入到 Queue 中。所以,先完成的必定先被取出。这样就 减少了不必要的等待时间。
public class CompletionCase {
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;
// 方法一,自己写集合来实现获取线程池中任务的返回结果
public void testByQueue() throws Exception {
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
//队列,拿任务的执行结果
BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<>();
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
queue.add(future);
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = queue.take().get();
//System.out.println(" slept "+sleptTime+" ms ...");
count.addAndGet(sleptTime);
}
// 关闭线程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms");
}
// 方法二:使用CompletionService
public void testByCompletion() throws Exception{
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService<Integer> cSevice
= new ExecutorCompletionService<>(pool);
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
cSevice.submit(new WorkTask("ExecTask" + i));
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = cSevice.take().get();
//System.out.println(" slept "+sleptTime+" ms ...");
count.addAndGet(sleptTime);
}
// 关闭线程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms");
}
public static void main(String[] args) throws Exception {
CompletionCase t = new CompletionCase();
t.testByQueue();
t.testByCompletion();
}
public class WorkTask implements Callable<Integer> {
private String name;
public WorkTask(String name) {
this.name = name;
}
@Override
public Integer call() {
int sleepTime = new Random().nextInt(1000);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回给调用者的值
return sleepTime;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
-------------tasks sleep time 41812ms,and spend time 5664 ms -------------tasks sleep time 38178ms,and spend time 5106 ms
使用方法一,自己创建一个集合来保存 Future 存根并循环调用其返回结果 的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按 加入线程池的顺序返回。因为 take 方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。
使用方法二,使用 CompletionService 来维护处理线程的返回结果时,主 线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。
应用场景
- 当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
- CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
- 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
# Fork/Join框架
分治算法
分治算法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。
分治算法的步骤如下:
- 分解:将要解决的问题划分成若干规模较小的同类问题;
- 求解:当子问题划分得足够小时,用较简单的方法解决;
- 合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。
应用场景
分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治。既然分治这种任务模型如此普遍,那 Java 显然也需要支持,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来支持分治这种任务模型的。
Fork/Join框架介绍
传统线程池ThreadPoolExecutor有两个明显的缺点:一是无法对大任务进行拆分,对于某个任务只能由单线程执行;二是工作线程从队列中获取任务时存在竞争情况。这两个缺点都会影响任务的执行效率。为了解决传统线程池的缺陷,Java7中引入Fork/Join框架,并在Java8中得到广泛应用。Fork/Join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool允许其他线程向它提交任务,并根据设定将这些任务拆分为粒度更细的子任务,这些子任务将由ForkJoinPool内部的工作线程来并行执行,并且工作线程之间可以窃取彼此之间的任务。
ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务。ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的加强。
根据经验和实验,任务总数、单任务执行耗时以及并行数都会影响到Fork/Join的性能。所以,当你使用Fork/Join框架时,你需要谨慎评估这三个指标,最好能通过模拟对比评估,不要凭感觉冒然在生产环境使用。
Fork-Join 原理
Fork/Join的使用
Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask
ForkJoinPool
ForkJoinPool 是用于执行 ForkJoinTask 任务的执行池,不再是传统执行池 Worker+Queue 的组合式,而是维护了一个队列数组 WorkQueue(WorkQueue[]),这样在提交任务和线程任务的时候大幅度减少碰撞。
ForkJoinPool构造器
ForkJoinPool中有四个核心参数,用于控制线程池的并行数、工作线程的创建、异常处理和模式指定等。各参数解释如下:
- int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;
- ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
- UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;
- boolean asyncMode:设置队列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。
按类型提交不同任务
任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:
返回值 | 方法 | |
---|---|---|
提交异步执行 | void | execute (opens new window)(ForkJoinTask (opens new window) task)execute (opens new window)(Runnable tas (opens new window)k) |
等待并获取结果 | T | invoke (opens new window)(ForkJoinTask (opens new window) task) |
提交执行获取Future结果 | ForkJoinTask | submit (opens new window)(ForkJoinTask (opens new window) task)submit (opens new window)(Callable (opens new window)task)submit (opens new window)(Runnable tas (opens new window)k)submit (opens new window)(Runnable tas (opens new window)k, T result) (opens new window) |
- execute类型的方法在提交任务后,不会返回结果。ForkJoinPool不仅允许提交ForkJoinTask类型任务,还允许提交Runnable任务
执行Runnable类型任务时,将会转换为ForkJoinTask类型。由于任务是不可切分的,所以这类任务无法获得任务拆分这方面的效益,不过仍然可以获得任务窃取带来的好处和性能提升。
- invoke方法接受ForkJoinTask类型的任务,并在任务执行结束后,返回泛型结果。如果提交的任务是null,将抛出空指针异常。
- submit方法支持三种类型的任务提交:ForkJoinTask类型、Callable类型和Runnable类型。在提交任务后,将返回ForkJoinTask类型的结果。如果提交的任务是null,将抛出空指针异常,并且当任务不能按计划执行的话,将抛出任务拒绝异常。
ForkJoinTask
ForkJoinTask是ForkJoinPool的核心之一,它是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
- fork()——提交任务
fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。
- join()、get()——获取任务执行结果
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:
- RecursiveAction:用于递归执行但不需要返回结果的任务。
- RecursiveTask :用于递归执行需要返回结果的任务。
- CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数
在我们自己实现的 compute 方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 invokeAll 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。
**
* forkjoin实现的归并排序
*/
public class FkSort {
private static class SumTask extends RecursiveTask<int[]>{
private final static int THRESHOLD = 2;
private int[] src;
public SumTask(int[] src) {
this.src = src;
}
@Override
protected int[] compute() {
if(src.length<=THRESHOLD){
return InsertionSort.sort(src);
}else{
//fromIndex....mid.....toIndex
int mid = src.length / 2;
SumTask leftTask = new SumTask(Arrays.copyOfRange(src, 0, mid));
SumTask rightTask = new SumTask(Arrays.copyOfRange(src, mid, src.length));
invokeAll(leftTask,rightTask);
int[] leftResult = leftTask.join();
int[] rightResult = rightTask.join();
return MergeSort.merge(leftResult,rightResult);
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] src = MakeArray.makeArray();
SumTask innerFind = new SumTask(src);
long start = System.currentTimeMillis();
int[] invoke = pool.invoke(innerFind);
for(int number:invoke){
System.out.println(number);
}
System.out.println(" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
}
public class MakeArray {
//数组长度
public static final int ARRAY_LENGTH = 40000000;
public final static int THRESHOLD = 47;
public static int[] makeArray() {
//new一个随机数发生器
Random r = new Random();
int[] result = new int[ARRAY_LENGTH];
for(int i=0;i<ARRAY_LENGTH;i++){
//用随机数填充数组
result[i] = r.nextInt(ARRAY_LENGTH*3);
}
return result;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
ForkJoinTask使用限制
ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。
ForkJoinPool 的工作原理
- ForkJoinPool 内部有多个工作队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个工作队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的工作队列中。
- ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的top,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从top取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base,也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO 方式。
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠 。
工作窃取
ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。
工作窃取的运行流程如下图所示 :
- 工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争;
- 工作窃取算法缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
思考:为什么这么设计,工作线程总是从头部获取任务,窃取线程从尾部获取任务?
这样做的主要原因是为了提高性能,通过始终选择最近提交的任务,可以增加资源仍分配在CPU缓存中的机会,这样CPU处理起来要快一些。而窃取者之所以从尾部获取任务,则是为了降低线程之间的竞争可能,毕竟大家都从一个部分拿任务,竞争的可能要大很多。
此外,这样的设计还有一种考虑。由于任务是可分割的,那队列中较旧的任务最有可能粒度较大,因为它们可能还没有被分割,而空闲的线程则相对更有“精力”来完成这些粒度较大的任务。
工作队列WorkQueue
- WorkQueue 是双向列表,用于任务的有序执行,如果 WorkQueue 用于自己的执行线程 Thread,线程默认将会从尾端选取任务用来执行 LIFO。
- 每个 ForkJoinWorkThread 都有属于自己的 WorkQueue,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread。
- 没有 ForkJoinWorkThread 的 WorkQueue 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是 偶数 位。
ForkJoinWorkThread
ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的 奇数 位。
总结
Fork/Join是一种基于分治算法的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:
- 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
- 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。
在使用ForkJoinPool时,需要特别注意任务的类型是否为纯函数计算类型,也就是这些任务不应该关心状态或者外界的变化,这样才是最安全的做法。如果是阻塞类型任务,那么你需要谨慎评估技术方案。虽然ForkJoinPool也能处理阻塞类型任务,但可能会带来复杂的管理成本。