异步注解@Async自定义线程池

导读:本篇文章讲解 异步注解@Async自定义线程池,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

一.自定义线程池

1、导入pom

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
</dependency>

2.创建异步配置类AsyncTaskConfig

/**
 * @author wangli
 * @create 2022-10-15 17:08
 */
@EnableAsync// 支持异步操作
@Configuration
public class AsyncTaskConfig {

    @Bean("async-executor-spring")
    public Executor AsyncExecutor(){
        ThreadPoolTaskExecutor  executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(10);
        // 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(50);
        // 缓存队列
        executor.setQueueCapacity(20);
        // 空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(200);
        // 异步方法内部线程名称
        executor.setThreadNamePrefix("async-executor-spring");

        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     *  1、corePoolSize:核心线程数
     *         * 核心线程会一直存活,及时没有任务需要执行
     *         * 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
     *         * 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
     *
     *     2、queueCapacity:任务队列容量(阻塞队列)
     *         * 当核心线程数达到最大时,新任务会放在队列中排队等待执行
     *
     *     3、maxPoolSize:最大线程数
     *         * 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
     *         * 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
     *
     *     4、 keepAliveTime:线程空闲时间
     *         * 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
     *         * 如果allowCoreThreadTimeout=true,则会直到线程数量=0
     *
     *     5、allowCoreThreadTimeout:允许核心线程超时
     *     6、rejectedExecutionHandler:任务拒绝处理器
     *         * 两种情况会拒绝处理任务:
     *             - 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
     *             - 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
     *         * 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
     *         * ThreadPoolExecutor类有几个内部实现类来处理这类情况:
     *             - AbortPolicy 丢弃任务,抛运行时异常
     *             - CallerRunsPolicy 执行任务
     *             - DiscardPolicy 忽视,什么都不会发生
     *             - DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
     *         * 实现RejectedExecutionHandler接口,可自定义处理器
     */

    /**
     * com.google.guava中的线程池
     * @return
     */
    @Bean("async-executor-guava")
    public Executor GuavaAsyncExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-executor-guava").build();
        // 当前可用cpu数
        //最佳线程数可通过计算得出http://ifeve.com/how-to-calculate-threadpool-size/
        int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
        /**
         * int corePoolSize,
         * int maximumPoolSize,
         * long keepAliveTime,
         * TimeUnit unit,
         * BlockingQueue<Runnable> workQueue,
         * ThreadFactory threadFactory
         */
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
                200, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
        //允许核心线程超时
        threadPool.allowsCoreThreadTimeOut();
        return threadPool;
    }

}

3.创建controller类

/**
 * @author wangli
 * @create 2022-10-15 17:48
 */
@Controller
public class AsyncController {
    @Autowired
    private AsyncService asyncService;

    @PostMapping("/AsyncMethond")
    public void AsyncMethond(){
        asyncService.AsyncMethond();
    }
}

4.创建service类

/**
 * @author wangli
 * @create 2022-10-15 17:49
 */
public interface AsyncService {

    public void AsyncMethond();
}

5.创建serviceimpl

/**
 * @author wangli
 * @create 2022-10-15 17:49
 */
@Service
public class AsyncServiceImpl implements AsyncService {
    @Override
    @Async("async-executor-guava")
    public void AsyncMethond() {
        System.out.println("调用异步方法");
    }
}

二.@Async注解

@Async的作用就是异步处理任务。

  • 在方法上添加@Async,表示此方法是异步方法;
  • 在类上添加@Async,表示类中的所有方法都是异步方法;
  • 使用此注解的类,必须是Spring管理的类;
  • 需要在启动类或配置类中加入@EnableAsync注解,@Async才会生效;
  • 在使用@Async时,如果不指定线程池的名称,也就是不自定义线程池,@Async是有默认线程池的,使用的是Spring默认的线程池SimpleAsyncTaskExecutor。

默认线程池的默认配置如下:

  • 默认核心线程数:8;
  • 最大线程数:Integet.MAX_VALUE;
  • 队列使用LinkedBlockingQueue;
  • 容量是:Integet.MAX_VALUE;
  • 空闲线程保留时间:60s;
  • 线程池拒绝策略:AbortPolicy;

从最大线程数可以看出,在并发情况下,会无限制的创建线程

也可以通过yml重新配置:

spring:
  task:
    execution:
      pool:
        max-size: 10
        core-size: 5
        keep-alive: 3s
        queue-capacity: 1000
        thread-name-prefix: my-executor

三.为什么异步会失败

  1. 注解@Async的方法不是public方法;
  2. 注解@Async的返回值只能为void或Future;
  3. 注解@Async方法使用static修饰也会失效;
  4. 没加@EnableAsync注解;
  5. 调用方和@Async不能在一个类中;
  6. 在Async方法上标注@Transactional是没用的,但在Async方法调用的方法上标注@Transcational是有效的;

四.线程池执行流程

在这里插入图片描述

 

五.如何合理规划线程池的大小

这个问题虽然看起来很小,却并不那么容易回答。大家如果有更好的方法欢迎赐教,先来一个天真的估算方法:假设要求一个系统的TPS(Transaction Per Second或者Task Per Second)至少为20,然后假设每个Transaction由一个线程完成,继续假设平均每个线程处理一个Transaction的时间为4s。那么问题转化为:

如何设计线程池大小,使得可以在1s内处理完20个Transaction?

计算过程很简单,每个线程的处理能力为0.25TPS,那么要达到20TPS,显然需要20/0.25=80个线程。

很显然这个估算方法很天真,因为它没有考虑到CPU数目。一般服务器的CPU核数为16或者32,如果有80个线程,那么肯定会带来太多不必要的线程上下文切换开销。

再来第二种简单的但不知是否可行的方法(N为CPU总核数):

  • 如果是CPU密集型应用,则线程池大小设置为N+1
  • 如果是IO密集型应用,则线程池大小设置为2N+1

如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。

接下来在这个文档:服务器性能IO优化 中发现一个估算公式:

1 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:

1 最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

可以得出一个结论:

线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

上一种估算方法也和这个结论相合。

一个系统最快的部分是CPU,所以决定一个系统吞吐量上限的是CPU。增强CPU处理能力,可以提高系统吞吐量上限。但根据短板效应,真实的系统吞吐量并不能单纯根据CPU来计算。那要提高系统吞吐量,就需要从“系统短板”(比如网络延迟、IO)着手:

  • 尽量提高短板操作的并行化比率,比如多线程下载技术
  • 增强短板能力,比如用NIO替代IO

第一条可以联系到Amdahl定律,这条定律定义了串行系统并行化后的加速比计算公式:

1 加速比=优化前系统耗时 / 优化后系统耗时

加速比越大,表明系统并行化的优化效果越好。Addahl定律还给出了系统并行度、CPU数目和加速比的关系,加速比为Speedup,系统串行化比率(指串行执行代码所占比率)为F,CPU数目为N:

1 Speedup <= 1 / (F + (1-F)/N)

当N足够大时,串行化比率F越小,加速比Speedup越大。

写到这里,我突然冒出一个问题。

是否使用线程池就一定比使用单线程高效呢?

答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:

  • 多线程带来线程上下文切换开销,单线程就没有这种开销

当然“Redis很快”更本质的原因在于:Redis基本都是内存操作,这种情况下单线程可以很高效地利用CPU。而多线程适用场景一般是:存在相当比例的IO和网络操作。

所以即使有上面的简单估算方法,也许看似合理,但实际上也未必合理,都需要结合系统真实情况(比如是IO密集型或者是CPU密集型或者是纯内存操作)和硬件环境(CPU、内存、硬盘读写速度、网络状况等)来不断尝试达到一个符合实际的合理估算值。

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/64375.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!