为什么要线程池
学习多线程的过程中,最初我们创建线程通常是通过 「new Thread()」 方式来创建一个线程,创建完成之后调用线程的start方法启动线程。通常代码如下所示:
Thread t = new Thread(() ->{
// 业务逻辑
});
t.start()
这样做能完成任务,但是却存在下面几个问题:
-
反复的创建和销毁线程,资源消耗较高。每次都需要手动的创建线程,在线程完成任务后线程销毁。在Java中创建线程的开销是很大,频繁的创建和销毁无疑会消耗大量资源。 -
不便于管理。手动创建线程,我们并不能清楚的知道到底创建了多少个,即使能知道也需要我们自己来实现线程的管理。同样我们没法控制并发的数量,无法控制并发数量容易导致太多线程对同一资源的争夺导致程序运行较低。 -
资源的隔离。通过线程池可以很好的将资源隔离,这样能避免局部故障导致全局故障。例如我们在hystrix中的壁仓就是使用的线程池来隔离的。
上面正式手动创建线程的缺点,而线程池正好就好解决上述问题。
ExecutorService
ExecutorService就是java中线程池的接口定义,它的接口以及实现类如下如所示:
该接口主要定义了一些列提交任务和关闭线程池的相关方法。
ThreadPoolExecutor
ExecutorService有很多实现,这里就以我们使用最多的ThreadPoolExecutor作为主要分析对象。
构造方法
ThreadPoolExecutor提供了一系列的构造方法同样还有一系列的参数,能正确理解这些参数的实际意义基本上算是对线程池有一般的了解了。我选取参数最全的构造函数作为示例:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize:核心线程池大小。需要记住的是线程池刚被创建出来时并不会立马创建出「corePoolSize」个线程,只有当任务提交到线程池时才会创建新的线程。 -
maximumPoolSize:最大线程个数。简单的说就是线程池最大的线程数量。有同学会问如果最大线程数还是无法满足呢?那就是直接执行拒绝策略了,具体是什么策略取决于你设置的拒绝策略。 -
keepAliveTime:当线程数大于「corePoolSize」时,多余的空闲线程在终止前等待的最长时间。 -
unit:keepAliveTime的时间单位。 -
workQueue:工作队列。当「corePoolSize」数达到后,如果还有任务继续提交,任务将被存在「workQueue」队列,而不是创建新的线程来处理。 -
threadFactory:线程工厂。用来创建线程的工厂。 -
handler:拒绝策略。当「workQueue」也无法存放更多线程,且线程池线程达到「maximumPoolSize」时,如果还有任务提交到线程池,这时候就会触发拒绝策略。
任务提交到线程池的过程
通过上面的内容我们大致知道了构造参数的作用,但是可能还是不能很好的理解。我们通过下图了解一个任务被提交到线程池的整个流程。
workQueue对maximumPoolSize的影响
现在有一个问题,如果workQueue是无界队列,请问maximumPoolSize是否还有效?
从流程图我们分析可以得出,如果我们的队列是无界队列,「maximumPoolSize」参数将失去它的意义。因为当线程池中的线程数达到corePoolSize之后,再向线程池提交任务,任务将被存放到「workQueue」中,而不是创建新的线程。如果线程在处理任务时耗时较久,任务将会持续的堆积在workQueu中,最后导致内存溢出。
public class ThreadPoolDemo1 {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor service = new ThreadPoolExecutor(1,10,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
for (int i = 0; i < 100; i++) {
System.out.println(i);
service.submit(new Task(i));
}
}
}
class Task implements Runnable{
private int taskId;
private byte[] bytes = new byte[1024 * 1024];
public Task(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
try {
Thread.sleep(500);
System.out.printf("任务[%d]的执行线程名为[%s]n",taskId,Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如上所示,即使我将「maximumPoolSize」设置成100,然后向线程池中持续提交100个任务。而每个任务所占的内存大小在1M(近似的计算),并且在任务中打印出执行当前任务的线程名称。为了能更快的达到试验效果,我们添加 「-Xms50m -Xmx50m」 设置内存的大小为50。
启动程序之后,当添加任务数量在44个左右时,主线程抛出内存溢出。
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at com.buydeem.share.Task.<init>(ThreadPoolDemo1.java:26)
at com.buydeem.share.ThreadPoolDemo1.main(ThreadPoolDemo1.java:19)
而每个任务的线程名称都是相同的。
任务[0]的执行线程名为[pool-1-thread-1]
任务[1]的执行线程名为[pool-1-thread-1]
任务[2]的执行线程名为[pool-1-thread-1]
任务[3]的执行线程名为[pool-1-thread-1]
....
我们通过修改线程池的配置参数,如下所示:
new ThreadPoolExecutor(1,10,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(10));
与之前不同的是,我们设置了workQueue为10,这意味着当workQueue队列满时,将会创建新的线程来处理任务而不是堆积在队列中导致内存溢出。
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@13b6d03 rejected from java.util.concurrent.ThreadPoolExecutor@3ecf72fd[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.buydeem.share.ThreadPoolDemo1.main(ThreadPoolDemo1.java:19)
修改参数后,当线程数达到「maximumPoolSize」时将执行拒绝策略。关于拒绝策略会在后面细说,这里就不展开了。同时执行任务的线程也不只是一个线程,而是很多线程执行任务。
任务[0]的执行线程名为[pool-1-thread-1]
任务[19]的执行线程名为[pool-1-thread-10]
任务[18]的执行线程名为[pool-1-thread-9]
任务[17]的执行线程名为[pool-1-thread-8]
任务[16]的执行线程名为[pool-1-thread-7]
任务[13]的执行线程名为[pool-1-thread-4]
拒绝策略
从上面的内容我们知道了,当线程数达到「maximumPoolSize」且「workQueue」也满了之后,再向线程池提交任务时将会执行拒绝策略。在线程池中提供了默认的四种拒绝策略分别如下:
-
「CallerRunsPolicy」:该策略的意思是如果现在有线程t1向线程池提交新的任务,此时该任务将由提交任务的线程去执行,也就是线程t1。
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor service = new ThreadPoolExecutor(1,1,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1),new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 5; i++) {
int taskId = i;
service.execute(() ->{
try {
Thread.sleep(500);
System.out.printf("任务[%d]的执行线程名为[%s]n", taskId,Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
最后的打印结果如下:
任务[2]的执行线程名为[main]
任务[0]的执行线程名为[pool-1-thread-1]
任务[3]的执行线程名为[main]
任务[1]的执行线程名为[pool-1-thread-1]
任务[4]的执行线程名为[pool-1-thread-1]
从打印结果可以看出,有部分任务是「main」线程执行的。
-
「AbortPolicy」:该策略的意思是直接抛出异常,同时该策略正是默认的拒绝策略。
ThreadPoolExecutor service = new ThreadPoolExecutor(1,1,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1),new ThreadPoolExecutor.AbortPolicy());
打印结果如下:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.buydeem.share.ThreadPoolDemo2$$Lambda$1/457233904@11531931 rejected from java.util.concurrent.ThreadPoolExecutor@45c8e616[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.buydeem.share.ThreadPoolDemo2.main(ThreadPoolDemo2.java:19)
任务[0]的执行线程名为[pool-1-thread-1]
任务[1]的执行线程名为[pool-1-thread-1]
从结果可以看出,「AbortPolicy」策略抛出异常。主线程异常同样导致后续任务无非被提交到线程池。
-
「DiscardPolicy」:该策略的意思是任务被抛弃了,任务将不会被执行。
ThreadPoolExecutor service = new ThreadPoolExecutor(1,1,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1),new ThreadPoolExecutor.DiscardOldestPolicy());
打印结果如下:
任务[0]的执行线程名为[pool-1-thread-1]
任务[4]的执行线程名为[pool-1-thread-1]
从打印结果可以看出,程序既没有报错,但是打印的结果只有两条,其他任务并没有执行而是被抛弃了。
-
「DiscardOldestPolicy」:该策略与「DiscardPolicy」类似,但是不同的是被抛弃的是队列中最老的任务,而刚被提交上去的任务并不会被抛弃。
ThreadFactory 线程工厂
ThreadFactory的作用就是用来创建线程池中的线程的。设置threadFactory参数,可以自定义线程。例如我们可以设置线程的名称,这样便于我们排查多线程中的错误,知道该错误是哪个线程池中的哪个线程抛出的。在实际开发中,强烈推荐使用自定义线程工厂,这样能大大的提高我们排错的效率。
下面我实现一个简单的线程工厂,为线程池中的每个线程创建一个自定义名称。
public class ThreadFactoryDemo {
public static void main(String[] args) {
ThreadFactory threadFactory = new SimpleThreadFactory("订单");
ExecutorService service = new ThreadPoolExecutor(1,10,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1),threadFactory);
for (int i = 0; i < 5; i++) {
final int taskId = i;
service.execute(new Runnable() {
@Override
public void run() {
System.out.printf("任务[%d]的执行线程名为[%s]n", taskId,Thread.currentThread().getName());
}
});
}
}
}
class SimpleThreadFactory implements ThreadFactory{
private final String prefixName;
private final AtomicInteger index = new AtomicInteger(0);
public SimpleThreadFactory(String prefixName) {
this.prefixName = prefixName;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(prefixName + "-" + index.getAndIncrement());
return thread;
}
}
运行代码打印效果如下:
任务[0]的执行线程名为[订单-0]
任务[1]的执行线程名为[订单-0]
任务[4]的执行线程名为[订单-3]
任务[3]的执行线程名为[订单-2]
任务[2]的执行线程名为[订单-1]
如何提交线程
通过上面我们知道了如何创建一个线程池,接下来就是如何把创建的任务放入到线程池中执行。
Runnable与Callable的区别
将任务提交到线程池有两类方法,第一种则是通过「execute」提交「Runnable」类型的任务,另一类则是通过「submit」提交「Callable」类型的任务。这两种类型的任务有什么区别呢?
简单的说就是「Runnable」执行完成之后没有返回值,而「Callable」可以获取返回值。
public class SubmitAndExecuteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor server = new ThreadPoolExecutor(2,4,30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1024));
Runnable r1 = new Runnable() {
@Override
public void run() {
System.out.println("Runnable: hello world");
}
};
Callable<String> c1 = new Callable<String>() {
@Override
public String call() throws Exception {
return "Callable: hello world";
}
};
server.execute(r1);
Future<String> f1 = server.submit(c1);
System.out.println(f1.get());
server.shutdown();
}
}
例如上面示例代码所示,我们可以通过「Callable」返回线程执行完成之后的结果,有时候还是非常方便的。除了这一点之外它们之间并没有太大的区别。
Futrue是什么
在上面的讲Callable时说到我们可以获取线程执行完成的结果。但是现在的问题是,主线程将任务提交到线程池,然后就继续执行后面的逻辑了。对于提交到线程池执行的任务,主线程并不知道什么时候执行完,那要怎样获取这个线程执行完成的结果呢?
答案就是Future。Future用来表示异步结果,它提供了检查计算是否完成的方法。通常情况下我们可以通过调用其get方法获取任务的执行结果,需要注意的是该方法时阻塞的,简单的说就是线程调用该方法后,线程就会被阻塞到任务执行完成或者等待时间截止(提供了get超时设置)。
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor server = new ThreadPoolExecutor(2,4,30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1024));
Future<String> future = server.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "hello world";
}
});
System.out.println("等待获取future结果");
String taskValue = future.get();
System.out.println("future的结果为:"+taskValue);
server.shutdown();
}
}
单个提交和批量提交
前面讲了如何提交任务,例如submit或者execute方法,不过这种方法的局限性在于一次只能提交一个任务。如果现在需要提交多个任务该怎么办呢?
ThreadPoolExecutor提供了invokeAll和invokeAny两个方法用来批量提交任务。这两个方法有什么区别呢?
invokeAll的意思是将提交的批量任务全部执行完成,然后返回一个Future列表,通过Future列表获取所有任务的结果。而InvokeAny的意思是批量提交的任务只要任何一个完成都行,对于未完成的任务会取消。
public class CommitDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor server = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final int taskId = i;
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("开始任务:" + taskId);
Thread.sleep(taskId * 1000);
return String.format("结果[%d]", taskId);
}
};
tasks.add(callable);
}
//invokeAll
List<Future<String>> f1s = server.invokeAll(tasks);
System.out.println("==========invokeAll==========");
for (Future<String> f1 : f1s) {
System.out.println(f1.get());
}
//invokeAny
System.out.println("==========invokeAny==========");
String result = server.invokeAny(tasks);
System.out.println(result);
server.shutdown();
}
}
上面代码展示了invokeAll和invokeAny之间的区别。最后运行程序的打印效果如下:
开始任务:1
开始任务:0
开始任务:2
==========invokeAll==========
结果[0]
结果[1]
结果[2]
==========invokeAny==========
开始任务:1
开始任务:0
开始任务:2
结果[0]
从结果可以看出,invokeAll里面的线程都执行完成了输出了三个结果,而invokeAny中虽然也开始了三个任务的执行,但是只输出了最先执行完成(也就是未抛出异常)的结果。
invokeAny会对未执行完成的线程执行中断操作,如果看了我之前写的中断线程相关文章的同学一定会有一个疑问,为什么控制台没有打印出异常。因为如果线程在「TIMED_WATING」被中断,线程是会抛出中断异常的,然而我们的控制台中并未抛出。其实线程是抛出中断异常的,我们可以通过try catch捕捉Thread.sleep处获取到该异常。
关闭线程池
前面介绍了如何提交任务到线程池让线程池执行,当所有的任务都执行完成之后,我们需要关闭线程池。线程池为我们提供了两个关闭线程池的方法,这个两个方法都可以用来关闭线程池,但是它们在某些方面表现不同。
shutdown和shutdownNow的区别
当调用shutdown时,对于线程池中还未执行完成的线程会继续执行,这里面不仅包含正在执行的任务,同时还包含了在workQueue中等待执行的任务。如果已经调用过shutdown之后,如果再向线程池提交任务,新任务将被拒绝。
public class ShutDownDemo {
public static void main(String[] args) {
ThreadPoolExecutor server = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 5; i++) {
final int taskId = i;
server.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("任务[%d]执行完成!n",taskId);
}
});
}
//关闭线程池
server.shutdown();
//线程池关闭之后提交任务
server.execute(new Runnable() {
@Override
public void run() {
System.out.println("shutdown后提交的任务");
}
});
}
}
如上面代码所示,我们向线程池提交了5个任务,并且设置了每个任务中休眠2s。对于线程池的初始化参数,我们设置了corePoolSize为2,以及workQueue长度为10,这样保证了超过corePoolSize的任务将被提交的workQueue中。运行代码打印结果如下所示:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.buydeem.share.ShutDownDemo$2@2f410acf rejected from java.util.concurrent.ThreadPoolExecutor@47089e5f[Shutting down, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.buydeem.share.ShutDownDemo.main(ShutDownDemo.java:35)
任务[0]执行完成!
任务[1]执行完成!
任务[2]执行完成!
任务[3]执行完成!
任务[4]执行完成!
从打印结果可以看出,所有的任务都被成功执行了,这也说明了只要提交到线程池中任务将被执行。同时打印结果中还报了一个异常,该异常说明当线程池调用过shutdown之后,如果再将任务提交到线程池将会被拒绝。
同样作为关闭线程池的方法「shutdownNow」与「shutdown」它们之间存在什么区别呢?相比shutdown而言,shutdownNow在调用之后,还处于「workQueue」中的线程将不会被执行,同时正在执行中的任务将被中断。对于正在处理中的线程是否还能执行完成这个取决于你是否响应了中断信息号。
public class ShutdownNowDemo {
public static void main(String[] args) {
ThreadPoolExecutor server = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 5; i++) {
final int taskId = i;
server.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.printf("任务[%d]被中断n", taskId);
return;
}
System.out.printf("任务[%d]执行完成!n", taskId);
}
@Override
public String toString() {
return super.toString() + ":" + taskId;
}
});
}
//关闭线程池
List<Runnable> removeTasks = server.shutdownNow();
removeTasks.forEach(System.out::println);
}
}
与之前的代码类似,我们这里将shutdown修改成shutdownNow。程序运行的打印结果如下所示:
任务[0]被中断
任务[1]被中断
com.buydeem.share.ShutdownNowDemo$1@448139f0:2
com.buydeem.share.ShutdownNowDemo$1@7cca494b:3
com.buydeem.share.ShutdownNowDemo$1@7ba4f24f:4
从打印结果可以看出,正在执行的记录只有两条,这两个任务在执行的过程中被中断了,而在队列中未被执行的任务直接被移除。
创建线程池的工具类
通过上面的内容已经了解了如何正确的使用线程池了,在JDK中还提供了一个Executors工具类用来快速创建线程池。如果看懂了上面线程池的参数就很容易了解它创建的线程池的特征了。例如下面这个:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它是创建一个线程大小固定的线程池,它的「corePoolSize」和「maximumPoolSize」大小一样,关键点在于它的「workQueue」是一个无解队列。这样它的最大线程数就固定了,永远都不会超过「corePoolSize」。
❝
Executors创建线程池虽然方便,但是实际开发中不太推荐使用该工具类来创建。还是推荐自己手动创建线程池,设置好各种参数,这样对能更适合自身的业务。同时设置了ThreadFactory还有利于程序出现异常时我们排查错误。
❞
总结
整体来说,掌握线程池的正确使用不算太难。难点在于各种线程池参数与实际业务的匹配,在这里我们需要清楚的了解创建线程池时各个参数的作用以及影响。如果对于这些参数不太了解,在使用线程池时容易出现各种问题。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/73031.html