讲线程池之前,先讲一下线程。
什么是线程
线程又分单线程和多线程;单线程,就是一条线程在执行任务。多线程,就是创建多条线程同时执行任务。
比如我们在使用浏览器浏览网页,如果每次只能打开一个窗口,这就是单线程;我们现在浏览器肯定是可以打开多个窗口,例如一个窗口我们可以听音乐,另外窗口还可以看新闻,这就是多线程的概念。并行和并发也是这个概念,比如说你在开车,然后朋友打电话过来,并行:一边开车,一边使用蓝牙耳机接听电话,同时处理;并发:在路边停车,接电话,接完电话继续开车。
什么是线程池
创建线程需要花费资源和时间,如果任务来了才创建线程,那么响应时间会变长。而且一个进程能创建的线程数有限。
为了避免这些问题,在程序启动的时候就创建若干线程来响应处理,它们被称为线程池,里面的线程叫工作线程。
Java api提供了Executor框架可以创建不同的线程池。
为什么要使用线程池
避免频繁地创建和销毁线程,达到线程对象的重用;线程资源管理
1. 创建线程池的方法
JAVA中创建线程池主要有两类方法,一类是通过Executors工厂类提供的方法,该类提供了4种不同的线程池可供使用。另一类是通过ThreadPoolExecutor类进行创建。
1.1 通过Executors工厂类提供的方法
1.1.1)newCachedThreadPool
创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
public static void main(String[] args) {
ExecutorService executorService= Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(()->{
System.out.println(LocalDateTime.now()+" "+Thread.currentThread().getName()+" "+ finalI);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
2021-06-16T10:01:27.728 pool-1-thread-1 0
2021-06-16T10:01:27.728 pool-1-thread-3 2
2021-06-16T10:01:27.728 pool-1-thread-2 1
2021-06-16T10:01:27.728 pool-1-thread-4 3
2021-06-16T10:01:27.728 pool-1-thread-5 4
2021-06-16T10:01:27.728 pool-1-thread-6 5
2021-06-16T10:01:27.728 pool-1-thread-10 9
2021-06-16T10:01:27.728 pool-1-thread-8 7
2021-06-16T10:01:27.728 pool-1-thread-7 6
2021-06-16T10:01:27.728 pool-1-thread-9 8
初始线程池没有线程,而线程不足会不断新建线程,所以线程名都是不一样的
1.1.2)newFixedThreadPool
创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(LocalDateTime.now()+" "+Thread.currentThread().getName()+" "+ finalI);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
2021-06-16T10:12:36.436 pool-1-thread-2 1
2021-06-16T10:12:36.436 pool-1-thread-3 2
2021-06-16T10:12:36.436 pool-1-thread-1 0
2021-06-16T10:12:39.447 pool-1-thread-1 3
2021-06-16T10:12:39.447 pool-1-thread-2 5
2021-06-16T10:12:39.447 pool-1-thread-3 4
2021-06-16T10:12:42.452 pool-1-thread-2 6
2021-06-16T10:12:42.452 pool-1-thread-1 7
2021-06-16T10:12:42.452 pool-1-thread-3 8
2021-06-16T10:12:45.460 pool-1-thread-1 9
因为线程池大小是固定的,这里设置的是3个线程,所以线程名只有3个。因为线程不足会进入队列等待线程空闲,所以日志间隔3秒输出
1.1.3)newScheduledThreadPool
创建一个周期性的线程池,支持定时及周期性执行任务。
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.schedule(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(LocalDateTime.now()+" "+Thread.currentThread().getName()+" "+ finalI);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},3,TimeUnit.SECONDS);
}
}
2021-06-16T10:16:41.187 pool-1-thread-2 1
2021-06-16T10:16:41.187 pool-1-thread-1 0
2021-06-16T10:16:41.187 pool-1-thread-3 2
2021-06-16T10:16:44.200 pool-1-thread-1 3
2021-06-16T10:16:44.200 pool-1-thread-3 4
2021-06-16T10:16:44.200 pool-1-thread-2 5
2021-06-16T10:16:47.221 pool-1-thread-1 6
2021-06-16T10:16:47.221 pool-1-thread-2 7
2021-06-16T10:16:47.221 pool-1-thread-3 8
2021-06-16T10:16:50.236 pool-1-thread-3 9
设置了延迟3秒,所以提交后3秒才开始执行任务。因为这里设置核心线程数为3个,而线程不足会进入队列等待线程空闲,所以日志间隔3秒输出。
注意:这里用的是ScheduledExecutorService类的schedule()方法,不是ExecutorService类的execute()方法。
1.1.4)newSingleThreadExecutor
创建一个单线程的线程池,可保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(LocalDateTime.now()+" "+Thread.currentThread().getName()+" "+ finalI);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
2021-06-16T10:19:20.117 pool-1-thread-1 0
2021-06-16T10:19:23.122 pool-1-thread-1 1
2021-06-16T10:19:26.132 pool-1-thread-1 2
2021-06-16T10:19:29.147 pool-1-thread-1 3
2021-06-16T10:19:32.147 pool-1-thread-1 4
2021-06-16T10:19:35.152 pool-1-thread-1 5
2021-06-16T10:19:38.155 pool-1-thread-1 6
2021-06-16T10:19:41.165 pool-1-thread-1 7
2021-06-16T10:19:44.166 pool-1-thread-1 8
2021-06-16T10:19:47.180 pool-1-thread-1 9
只有一个线程,所以线程名均相同,且是每隔3秒按顺序输出的
1.2 通过ThreadPoolExecutor类
ThreadPoolExecutor类提供了4种构造方法,可以根据需要来自定义。
1.2.1 线程池执行规则
当线程数小于corePoolSize时,创建一个线程;
当线程数大于等于corePoolSize时,且任务队列未满,将任务放在任务队列中;
当线程数大于等于corePoolSize时,任务队列已满;
若线程数小于最大线程数,创建线程;
若线程数等于最大线程数,抛出异常,拒绝任务。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//省略
}
可以看到有7个参数:
1.2.2 参数说明
1)corePoolSize 核心线程数,线程池中始终存活的线程数
2)maximumPoolSize 最大线程数,线程池中允许的最大线程数
3)keepAliveTime 存活时间,线程没有任务执行时最多保持多久时间会终止
4)unit 参数keepAliveTime的时间单位
TimeUnit.DAYS 天
TimeUnit.HOURS 小时
TimeUnit.MINUTES 分
TimeUnit.SECONDS 秒
TimeUnit.MILLISECONDS 毫秒
TimeUnit.MICROSECONDS 微妙
TimeUnit.NANOSECONDS 纳秒
5)workQueue 阻塞队列,用来存储等待执行的任务,均为线程安全
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
SynchronousQueue 一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
DelayQueue 一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。
常用的是LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
6)threadFactory 线程工厂,主要用来创建线程,默及正常优先级、非守护线程
7)handler 拒绝策略,拒绝处理任务时的策略。默认为AbortPolicy。
AbortPolicy 拒绝并抛出异常。
CallerRunsPolicy 重试提交当前的任务,即再次调用运行该任务的execute()方法。
DiscardOldestPolicy 抛弃队列头部(最旧)的一个任务,并执行当前任务。
DiscardPolicy 抛弃当前任务。
1.2.3 代码实例
public static void thread5() {
ExecutorService executorService = new ThreadPoolExecutor(2, 10, 1, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(5, true));
for (int i = 0; i < 10; i++) {
int index=i;
executorService.execute(()->{
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " " + index);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
2021-06-16T15:32:41.801 pool-1-thread-2 1
2021-06-16T15:32:41.801 pool-1-thread-3 7
2021-06-16T15:32:41.801 pool-1-thread-5 9
2021-06-16T15:32:41.801 pool-1-thread-1 0
2021-06-16T15:32:41.801 pool-1-thread-4 8
2021-06-16T15:32:43.817 pool-1-thread-3 2
2021-06-16T15:32:43.818 pool-1-thread-4 3
2021-06-16T15:32:43.818 pool-1-thread-2 4
2021-06-16T15:32:43.818 pool-1-thread-5 5
2021-06-16T15:32:43.818 pool-1-thread-1 6
核心线程数是2,阻塞队列是5,存活时间为1分钟。所以任务流程是:
1.2.4 拒绝策略
上面说了线程池还有拒绝策略,就是当前线程数=最大线程数,阻塞队列也满了,就会自动执行拒绝策略,抛出异常。
代码实例
public static void thread6() {
ExecutorService executorService = new ThreadPoolExecutor(2, 10, 1, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(5, true));
for (int i = 0; i < 20; i++) {
int index = i;
executorService.execute(() -> {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " " + index);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.ssw.config.Test$$Lambda$1/1516369375@26f67b76 rejected from java.util.concurrent.ThreadPoolExecutor@69d9c55[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.ssw.config.Test.thread6(Test.java:93)
at com.ssw.config.Test.main(Test.java:105)
2021-06-16T15:55:26.457 pool-1-thread-7 11
2021-06-16T15:55:26.457 pool-1-thread-9 13
2021-06-16T15:55:26.457 pool-1-thread-10 14
2021-06-16T15:55:26.457 pool-1-thread-2 1
2021-06-16T15:55:26.457 pool-1-thread-5 9
2021-06-16T15:55:26.457 pool-1-thread-6 10
2021-06-16T15:55:26.457 pool-1-thread-1 0
2021-06-16T15:55:26.457 pool-1-thread-4 8
2021-06-16T15:55:26.457 pool-1-thread-8 12
2021-06-16T15:55:26.457 pool-1-thread-3 7
2021-06-16T15:55:28.463 pool-1-thread-8 2
2021-06-16T15:55:28.463 pool-1-thread-3 3
2021-06-16T15:55:28.463 pool-1-thread-1 4
2021-06-16T15:55:28.463 pool-1-thread-7 5
2021-06-16T15:55:28.463 pool-1-thread-10 6
2. 项目实践
SpringBoot项目中使用线程池,场景如下:一个方法,调用两个接口。假如接口1需要执行5秒,接口2需要执行8秒。
2.1 不使用线程池
不使用线程池,默认按顺序执行,5秒+8秒,该方法最少需要13秒执行完毕。
@RequestMapping("thread")
public String asd() throws InterruptedException, ExecutionException {
log.info("开始任务");
String as = test1();
String bs = test2();
log.info("结束任务");
return as + "---" + bs;
}
public String test1() throws InterruptedException {
Thread.sleep(5000); // 模拟耗时
log.info("test1执行结束");
return "123456";
}
public String test2() throws InterruptedException {
Thread.sleep(8000); // 模拟耗时
log.info("test2执行结束");
return "456789";
}
该测试方法,提供了两个接口,最大打印出返回结果。
存在的问题:正常情况下是没问题的,但是实际业务肯定是有很多的逻辑处理,如果要正常跑完一个复杂的接口,效率就变得非常低了。比如:去食堂打饭,只有一个工作人员负责打饭。如果早去一会,排队的同学比较少,肯定没什么问题;但是等下课,同学们在一时间都来了,是不是就会有点忙不过来了呢。使用线程也是这个道理,既然人多,我就多增加几个工作人员负责打饭,同时进行。
是不是效率一下就提升了,回到正题:一个请求需要5秒,一个请求需要8秒,如果两个请求同时执行,是不是8秒就可以执行完成,效率是不是就提升了呢?
2.2 使用线程池 ThreadPoolTaskExecutor
读到这里,你是否有疑问。上面说的是ThreadPoolExecutor,怎么这里变成了ThreadPoolTaskExecutor。
这里跟大家讲一下这两者的区别:
ThreadPoolTaskExecutor使用了ThreadPoolExecutor并增强,扩展了更多特性;
ThreadPoolTaskExecutor只关注自己增强的部分,任务执行还是ThreadPoolExecutor处理;
ThreadPoolTaskExecutor实现了InitializingBean, DisposableBean等,具有spring特性,ThreadPoolExecutor不提供spring声明周期和参数装配;
用哪个都可以,如果是spring项目,推荐使用ThreadPoolTaskExecutor。
2.2.1 异步线程池配置
@Log4j2
@EnableAsync
@Configuration
public class ThreadPoolTaskConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
log.info("开启线程池");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(5);
//最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(20);
//缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(30);
//允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
//线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("timing-task-taskexecutor-");
//线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;
//如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
executor.afterPropertiesSet();
return executor;
}
}
2.2.2 修改两个测试接口
public interface TestService {
Future<String> test1() throws InterruptedException;
Future<String> test2() throws InterruptedException;
}
@Log4j2
@Service
public class TestServiceImpl implements TestService {
@Override
@Async("taskExecutor")
public Future<String> test1() throws InterruptedException {
Thread.sleep(5000); // 模拟耗时
log.info("发送短信方法---- 1 执行结束");
return new AsyncResult<String>("123456");
}
@Override
@Async("taskExecutor")
public Future<String> test2() throws InterruptedException {
Thread.sleep(8000); // 模拟耗时
log.info("发送短信方法---- 2 执行结束");
return new AsyncResult<String>("456789");
}
}
2.2.3 控制层调用
@Log4j2
@RestController
@RequestMapping("test")
public class TestThreadPoolController {
@Autowired
private TestService testService;
@RequestMapping("thread1")
public String asd1() throws InterruptedException, ExecutionException {
log.info("开始任务");
Future<String> as = testService.test1();
Future<String> bs = testService.test2();
log.info("结束任务");
return as.get() + "---" + bs.get();
}
}
至此,就完成了我们想要的效果,同步执行。其实呢,也不能说是使用线程池实现了并行执行,使用多线程也可以。我们只是将创建线程的操作,交给了线程池,在项目启动完成后就启动了若干线程,使用线程池来管理线程。
2.3 失效场景
2.3.1 必须增加@EnableAsync和@Async注解
@EnableAsync用来开启项目异步支持,@Async用来开启对某个方法进行异步执行。
2.3.2 异步方法需要纳入到Spring的bean中
通过@Component注解,或其他注解纳入Spring的bean容器中。
2.3.3 调用时通过自动装配
类中使用@Autowired或@Resource等注解自动注入,不能手动new对象。
2.3.4 异步方法不能使用static修饰
2.3.5 调用方不能和异步方法不能在同一个类
如下:
@RequestMapping("thread2")
public String asd12() throws InterruptedException, ExecutionException {
log.info("开始任务");
Future<String> as = test1();
Future<String> bs = test2();
log.info("结束任务");
return as.get() + "---" + bs.get();
}
@Async("taskExecutor")
public Future<String> test1() throws InterruptedException {
Thread.sleep(5000); // 模拟耗时
log.info("发送短信方法---- 1 执行结束");
return new AsyncResult<String>("123456");
}
@Async("taskExecutor")
public Future<String> test2() throws InterruptedException {
Thread.sleep(8000); // 模拟耗时
log.info("发送短信方法---- 2 执行结束");
return new AsyncResult<String>("456789");
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/143379.html