一.自定义线程池
1、导入pom
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
2.创建异步配置类AsyncTaskConfig
/**
* @author wangli
* @create 2022-10-15 17:08
*/
@EnableAsync// 支持异步操作
@Configuration
public class AsyncTaskConfig {
@Bean("async-executor-spring")
public Executor AsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(50);
// 缓存队列
executor.setQueueCapacity(20);
// 空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(200);
// 异步方法内部线程名称
executor.setThreadNamePrefix("async-executor-spring");
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 1、corePoolSize:核心线程数
* * 核心线程会一直存活,及时没有任务需要执行
* * 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
* * 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
*
* 2、queueCapacity:任务队列容量(阻塞队列)
* * 当核心线程数达到最大时,新任务会放在队列中排队等待执行
*
* 3、maxPoolSize:最大线程数
* * 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
* * 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
*
* 4、 keepAliveTime:线程空闲时间
* * 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
* * 如果allowCoreThreadTimeout=true,则会直到线程数量=0
*
* 5、allowCoreThreadTimeout:允许核心线程超时
* 6、rejectedExecutionHandler:任务拒绝处理器
* * 两种情况会拒绝处理任务:
* - 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
* - 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
* * 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
* * ThreadPoolExecutor类有几个内部实现类来处理这类情况:
* - AbortPolicy 丢弃任务,抛运行时异常
* - CallerRunsPolicy 执行任务
* - DiscardPolicy 忽视,什么都不会发生
* - DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
* * 实现RejectedExecutionHandler接口,可自定义处理器
*/
/**
* com.google.guava中的线程池
* @return
*/
@Bean("async-executor-guava")
public Executor GuavaAsyncExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-executor-guava").build();
// 当前可用cpu数
//最佳线程数可通过计算得出http://ifeve.com/how-to-calculate-threadpool-size/
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
/**
* int corePoolSize,
* int maximumPoolSize,
* long keepAliveTime,
* TimeUnit unit,
* BlockingQueue<Runnable> workQueue,
* ThreadFactory threadFactory
*/
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
200, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
//允许核心线程超时
threadPool.allowsCoreThreadTimeOut();
return threadPool;
}
}
3.创建controller类
/**
* @author wangli
* @create 2022-10-15 17:48
*/
@Controller
public class AsyncController {
@Autowired
private AsyncService asyncService;
@PostMapping("/AsyncMethond")
public void AsyncMethond(){
asyncService.AsyncMethond();
}
}
4.创建service类
/**
* @author wangli
* @create 2022-10-15 17:49
*/
public interface AsyncService {
public void AsyncMethond();
}
5.创建serviceimpl
/**
* @author wangli
* @create 2022-10-15 17:49
*/
@Service
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("async-executor-guava")
public void AsyncMethond() {
System.out.println("调用异步方法");
}
}
二.@Async注解
@Async的作用就是异步处理任务。
- 在方法上添加@Async,表示此方法是异步方法;
- 在类上添加@Async,表示类中的所有方法都是异步方法;
- 使用此注解的类,必须是Spring管理的类;
- 需要在启动类或配置类中加入@EnableAsync注解,@Async才会生效;
- 在使用@Async时,如果不指定线程池的名称,也就是不自定义线程池,@Async是有默认线程池的,使用的是Spring默认的线程池SimpleAsyncTaskExecutor。
默认线程池的默认配置如下:
- 默认核心线程数:8;
- 最大线程数:Integet.MAX_VALUE;
- 队列使用LinkedBlockingQueue;
- 容量是:Integet.MAX_VALUE;
- 空闲线程保留时间:60s;
- 线程池拒绝策略:AbortPolicy;
从最大线程数可以看出,在并发情况下,会无限制的创建线程
也可以通过yml重新配置:
spring:
task:
execution:
pool:
max-size: 10
core-size: 5
keep-alive: 3s
queue-capacity: 1000
thread-name-prefix: my-executor
三.为什么异步会失败
- 注解@Async的方法不是public方法;
- 注解@Async的返回值只能为void或Future;
- 注解@Async方法使用static修饰也会失效;
- 没加@EnableAsync注解;
- 调用方和@Async不能在一个类中;
- 在Async方法上标注@Transactional是没用的,但在Async方法调用的方法上标注@Transcational是有效的;
四.线程池执行流程
五.如何合理规划线程池的大小
这个问题虽然看起来很小,却并不那么容易回答。大家如果有更好的方法欢迎赐教,先来一个天真的估算方法:假设要求一个系统的TPS(Transaction Per Second或者Task Per Second)至少为20,然后假设每个Transaction由一个线程完成,继续假设平均每个线程处理一个Transaction的时间为4s。那么问题转化为:
如何设计线程池大小,使得可以在1s内处理完20个Transaction?
计算过程很简单,每个线程的处理能力为0.25TPS,那么要达到20TPS,显然需要20/0.25=80个线程。
很显然这个估算方法很天真,因为它没有考虑到CPU数目。一般服务器的CPU核数为16或者32,如果有80个线程,那么肯定会带来太多不必要的线程上下文切换开销。
再来第二种简单的但不知是否可行的方法(N为CPU总核数):
- 如果是CPU密集型应用,则线程池大小设置为N+1
- 如果是IO密集型应用,则线程池大小设置为2N+1
如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。
接下来在这个文档:服务器性能IO优化 中发现一个估算公式:
1 |
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目 |
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
1 |
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目 |
可以得出一个结论:
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
上一种估算方法也和这个结论相合。
一个系统最快的部分是CPU,所以决定一个系统吞吐量上限的是CPU。增强CPU处理能力,可以提高系统吞吐量上限。但根据短板效应,真实的系统吞吐量并不能单纯根据CPU来计算。那要提高系统吞吐量,就需要从“系统短板”(比如网络延迟、IO)着手:
- 尽量提高短板操作的并行化比率,比如多线程下载技术
- 增强短板能力,比如用NIO替代IO
第一条可以联系到Amdahl定律,这条定律定义了串行系统并行化后的加速比计算公式:
1 |
加速比=优化前系统耗时 / 优化后系统耗时 |
加速比越大,表明系统并行化的优化效果越好。Addahl定律还给出了系统并行度、CPU数目和加速比的关系,加速比为Speedup,系统串行化比率(指串行执行代码所占比率)为F,CPU数目为N:
1 |
Speedup <= 1 / (F + ( 1 -F)/N) |
当N足够大时,串行化比率F越小,加速比Speedup越大。
写到这里,我突然冒出一个问题。
是否使用线程池就一定比使用单线程高效呢?
答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:
- 多线程带来线程上下文切换开销,单线程就没有这种开销
- 锁
当然“Redis很快”更本质的原因在于:Redis基本都是内存操作,这种情况下单线程可以很高效地利用CPU。而多线程适用场景一般是:存在相当比例的IO和网络操作。
所以即使有上面的简单估算方法,也许看似合理,但实际上也未必合理,都需要结合系统真实情况(比如是IO密集型或者是CPU密集型或者是纯内存操作)和硬件环境(CPU、内存、硬盘读写速度、网络状况等)来不断尝试达到一个符合实际的合理估算值。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/64375.html