上一篇文章已经写到了实现线程的几种方式:Java异步任务 线程详解(1)
一、说说为什么要用到线程池
在处理高并发的时候,往往会考虑到使用多线程的方式来提高速率,
但是我们常见的
- Thread01 extends Thread
- Runable01 implements Runnable
- Callable01 implements Callable
这三种方式是不建议直接使用的,具体是为什么呢?我举个列子大家应该就会懂了。
比如说一个公司技术开发人员有10个人,为了提高效率,每次有一个新的需求就会去招聘一个开发人员,这样是提高了很大的开发效率,但是随之而来的一个问题就是随着需求的完成,公司的开发人员会越来越多,但是需求已经做完了,那么这些开发人员就会没什么事干,导致了公司的运营成本越来越大,最后导致公司破产。回到我们代码当中也是一样的,每创建一个线程就会消耗一定的资源,你每个请求都创建很多线程,cpu资源是有限的,最后就会导致cpu100%而使得整个服务器奔溃。
而采用线程池的优点如下:
- 降低资源消耗。可以重复利用已经创建好的线程,降低线程的创建和销毁带来的损耗。
- 提高响应速度。因为线程池中的线程都处于等待分配任务状态,当任务过来时可以直接执行。
- 提高线程的可管理性。如果是单cpu的话,创建多个线程,会导致资源耗尽,但是线程池有拒绝策略;另外还可以核心业务和非核心业务两种线程池,如果某个时间内存压力大,可以释放掉非核心业务线程池。使用线程池就可以使线程的管理方便。
二、使用Executors创建线程池
使用Executors创建线程池其实底层还是使用的ThreadPoolExecutor来创建的线程池。
1.Executor的UML图:(常用的几个接口和子类)
Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
2.线程池七大参数
int corePoolSize:核心线程数,线程池创建好就已经准备就绪的线程数 量,等待接收异步任务 ,异步任务进来后,自动执行。核心线程会一直存在,除非设置了allowCoreThreadTimeOut,才允许核心线程超时。
int maximumPoolSize:线程池允许存在的最大线程数
long keepAliveTime:超时时间。如果当前线程数量大于核心数量,且在keepAliveTime时间内保持空闲,就释放掉。 释放的是最大线程数 - 核心线程数
TimeUnit unit: 超时时间单位
BlockingQueue workQueue:阻塞队列,如果线程有很多,就会把线程保存在队列里 只要线程有空闲,就去阻塞队列中取。
分类见下文。
ThreadFactory threadFactory:线程的创建工厂
RejectedExecutionHandler handler:拒绝策略 ,如果线程满了采取的策略
拒绝策略分类:
①:DiscardOldestPolicy 抛弃掉最早进入的线程
②:DiscardPolicy 抛弃掉最新的线程
③:DiscardPolicy 剩余的线程调用run方法,变为同步执行
④:DiscardPolicy 抛弃掉最新的线程,并抛出异常
三、使用Executors创建线程池
//线程池,每个系统应该有一个共有的有一个或者是有两个
public static ExecutorService executor = Executors.newFixedThreadPool(10);
Executors创建线程池的四种方式:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
四、CompletableFuture介绍
CompletableFuture是JDK1.8版本新引入的类。CompletableFuture类实现了CompletionStage和Future接口。
具体的使用方式直接看代码:
单异步任务
/**
* 无返回值的异步任务 runAsync()
*/
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executor);
/**
* 方法完成后的处理
* 有返回值,但是不能修改数据
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((res,exception) -> {
//虽然能得到异常信息,但是没法修改返回数据
System.out.println("异步任务成功完成了...结果是:" + res + "异常是:" + exception);
}).exceptionally(throwable -> {
//可以感知异常,同时返回默认值
return 10;
});
/**
* 方法执行完后端处理
* 有返回值,并且能感知异常并且修改数据
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((result,thr) -> {
if (result != null) {
return result * 2;
}
if (thr != null) {
System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr);
return 0;
}
return 0;
});
线程串行化
线程串行化
1、thenRunL:不能获取上一步的执行结果
2、thenAcceptAsync:能接受上一步结果,但是无返回值
3、thenApplyAsync:能接受上一步结果,有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("任务2启动了..." + res);
return "Hello" + res;
}, executor);
System.out.println("main......end....." + future.get());
/**
* 两个都完成
*/
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一启动线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务一结束:" + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二启动线程:" + Thread.currentThread().getId());
System.out.println("任务二结束:" );
return "hello";
}, executor);
future01.thenAcceptBothAsync(future02,(f1,f2) ->{
System.out.println("得到的结果"+f1+"_>>>>>>>>"+f2);
},executor);
CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
return f1 + f2+"哈哈哈";
}, executor);
/**
* 线程串行化
* 1、thenRunL:不能获取上一步的执行结果
* 2、thenAcceptAsync:能接受上一步结果,但是无返回值
* 3、thenApplyAsync:能接受上一步结果,有返回值
*
*/
// CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// int i = 10 / 2;
// System.out.println("运行结果:" + i);
// return i;
// }, executor).thenApplyAsync(res -> {
// System.out.println("任务2启动了..." + res);
// return "Hello" + res;
// }, executor);
// System.out.println("main......end....." + future.get());
/**
* 两个都完成
*/
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一启动线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务一结束:" + i);
return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二启动线程:" + Thread.currentThread().getId());
System.out.println("任务二结束:" );
return "hello";
}, executor);
// future01.thenAcceptBothAsync(future02,(f1,f2) ->{
// System.out.println("得到的结果"+f1+"_>>>>>>>>"+f2);
// },executor);
// CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
// return f1 + f2+"哈哈哈";
// }, executor);
/**
* 两个任务只要有一个完成 我们就执行任务三
* runAfterEitherAsync :不感知结果 自己也无返回值
*/
future01.runAfterEitherAsync(future02,()->{
System.out.println("得到的结果+");
},executor);
/**
* 两个任务只要有一个完成 我们就执行任务三
* acceptEitherAsync :可以感知结果 自己也无返回值
*/
future01.acceptEitherAsync(future02,(res)->{
System.out.println(res);
},executor);
/**
* 两个任务只要有一个完成 我们就执行任务三
* applyToEitherAsync :可以感知结果 也有返回值
*/
future01.applyToEitherAsync(future02,(res)->{
return res.toString();
},executor);
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息");
return "hello.jpg";
},executor);
CompletableFuture<String> futureInfo = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性");
return "黑色 256G";
},executor);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品介绍");
return "华为";
},executor);
五、多任务执行
- allOf:全部线程结束以后才会执行
- anyOf:只要有一个线程执行完成就会执行
/**
* allOf:全部线程结束以后才会执行
* anyOf:只要有一个线程执行完成就会执行
*/
// CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureInfo, futureDesc);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureInfo, futureDesc);
// allOf.get();//等待所有结果完成
anyOf.get();
//还可以得到上面的所有现成的结果
System.out.println("main......end....."+futureInfo.get()+futureDesc.get()+futureImg.get());
六、项目实战
项目为SpringBoot项目,以获取商城商品信息为列:
- 配置类MyThreadPoolConfig
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
public class MyThreadPoolConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties properties) {
return new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
- 配置类:ThreadPoolConfigProperties
@ConfigurationProperties(prefix = "duoduo.thread")
@Data
public class ThreadPoolConfigProperties {
/**
* 核心线程大小
*/
private int corePoolSize;
/**
* 最大线程大小
*/
private int maxPoolSize;
/**
* 休眠时间
*/
private long keepAliveTime;
}
- application.yml配置文件
# 自定义线程池参数
duoduo:
thread:
corePoolSize: 10
maxPoolSize: 200
keepAliveTime: 10
- 使用多线程业务代码
//线程池
@Autowired
private ThreadPoolExecutor executor;
@Override
public SkuItemVo item(Long skuId) {
SkuItemVo skuItemVo = new SkuItemVo();
//方法完成后的处理,supplyAsync有返回值
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
//线程完成后无返回值,runAsync
//2、sku的图片信息 pms_sku_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> skuImagesEntities = skuImagesService.list(new QueryWrapper<SkuImagesEntity>().eq("sku_id", skuId));
skuItemVo.setImages(skuImagesEntities);
}, executor);
//能接受上一步结果,但是无返回值,thenAcceptAsync
//3、获取spu的销售属性组合-> 依赖1 获取spuId
CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((info) -> {
// List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(info.getSpuId());
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(info.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
//能接受上一步结果,但是无返回值,thenAcceptAsync
//4、获取spu的介绍-> 依赖1 获取spuId
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((info) -> {
SpuInfoDescEntity byId = spuInfoDescService.getById(info.getSpuId());
skuItemVo.setDesc(byId);
}, executor);
//能接受上一步结果,但是无返回值,thenAcceptAsync
//5、获取spu的规格参数信息-> 依赖1 获取spuId catalogId
CompletableFuture<Void> attrFuture = infoFuture.thenAcceptAsync((info) -> {
// List<SpuItemAttrGroupVo> spuItemAttrGroupVos=productAttrValueService.getProductGroupAttrsBySpuId(info.getSpuId(), info.getCatalogId());
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(info.getSpuId(), info.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
//6、秒杀商品的优惠信息
// CompletableFuture<Void> seckFuture = CompletableFuture.runAsync(() -> {
// R r = seckillFeignService.getSeckillSkuInfo(skuId);
// if (r.getCode() == 0) {
// SeckillSkuVo seckillSkuVo = r.getData(new TypeReference<SeckillSkuVo>() {
// });
// long current = System.currentTimeMillis();
// //如果返回结果不为空且活动未过期,设置秒杀信息
// if (seckillSkuVo != null&¤t<seckillSkuVo.getEndTime()) {
// skuItemVo.setSeckillSkuVo(seckillSkuVo);
// }
// }
// }, executor);
//等待所有任务执行完成
try {
CompletableFuture.allOf(imageFuture, saleFuture, descFuture, attrFuture).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return skuItemVo;
}
以上内容可能因具体版本有差异,有问题欢迎留言探讨。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/158988.html