SpringBoot中@Async的使用总结

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。SpringBoot中@Async的使用总结,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

对于执行耗时的方法,一般业务中会通过Async方式开启新线程执行,这样耗时的程序就不会阻塞当前方法的执行,在开启新线程时,一种方式是通过new Thread().start()的方式开启,还有一种方式是在方法上使用@Async注解方式实现。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.11</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.7</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

@Async注解是spring中提供的,要使用它,需要在启动类上通过@EnableAsync开启该功能,并且注意在spring中是通过代理方式实现开启新线程的,所以在同一个类中调用@Async标记的方法不能实现异步效果。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @Author xingo
 * @Date 2023/11/1
 */
@SpringBootApplication
@EnableAsync
public class ProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class, args);
    }
}

@Async默认使用线程工厂的配置类是 org.springframework.boot.autoconfigure.task.TaskExecutionProperties ,打开该类内容里面有一个Pool类包含的字段内容如下:

/**
 * 队列容量
 */
private int queueCapacity = Integer.MAX_VALUE;

/**
 * 核心线程数
 */
private int coreSize = 8;

/**
 * 最大线程数
 */
private int maxSize = Integer.MAX_VALUE;

队列容量和允许的最大线程数都是int的最大值,可以认为是无界队列,线程数量也没有上限,而核心线程数只有8,也就是说不管我们提交多少任务进来,最多只会有8个线程执行任务,这种配置对于io密集型的互联网项目并不适用,所以我们一般都是会自己按照项目修改配置来满足需求,在 TaskExecutionProperties 配置类上面有 @ConfigurationProperties(“spring.task.execution”) 注解,也就是说可以通过设置application.yaml中的属性来调整参数。

首先设计一个服务类来打印当前执行的线程:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 异步服务
 * 
 * @Author xingo
 * @Date 2023/11/1
 */
@Service
public class AsyncService {

    @Async
    public void print() {
        System.out.println(Thread.currentThread().getName() + " work");
    }
}

调用上面的方法,在没有设置参数前输出的内容如下:

task-1 work
task-2 work
task-3 work
task-4 work
task-5 work
task-6 work
task-7 work
task-8 work
task-1 work
task-2 work
task-3 work
task-4 work
task-5 work
...

下面在application.yaml中配置如下参数值:

spring:
  task:
    execution:
      thread-name-prefix: myThread-
      pool:
        queue-capacity: 256
        core-size: 16
        max-size: 64
        allow-core-thread-timeout: false
        keep-alive: 60

再次执行上面的程序,打印内容如下:

myThread-1 work
myThread-2 work
myThread-3 work
myThread-4 work
myThread-5 work
myThread-6 work
myThread-7 work
myThread-8 work
myThread-9 work
myThread-10 work
myThread-11 work
myThread-12 work
myThread-13 work
myThread-14 work
myThread-15 work
myThread-16 work
myThread-1 work
myThread-2 work
myThread-3 work
...

通过打印输出可以看到,确实对@Async注解有关线程池的参数产生了影响,这种是全局的参数调整,更多时候我们只想对一部分方法配置单独的线程池进行处理,其他方法再用另外的配置,这时候我们就需要使用自定义线程池进行处理,配置线程池过程如下:

首先定义一个配置类,这个类可以读取application.yaml中的配置内容:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * 自定义线程池配置
 *
 * @Author xingo
 * @Date 2023/11/1
 */
@Configuration
@ConfigurationProperties("mythead.pool")
public class MyThreadPoolConfig {

    /**
     * 核心线程数
     */
    private int coreSize = 8;

    /**
     * 最大线程数
     */
    private int maxSize = 256;

    /**
     * 线程空闲时间(秒),超过这个时间的线程会被回收
     */
    private int keepAliveSeconds = 60;

    /**
     * 阻塞队列容量
     */
    private int queueCapacity = Integer.MAX_VALUE;

    /**
     * 线程名称前缀
     */
    private String threadNamePrefix = "myThread-";

    public int getCoreSize() {
        return coreSize;
    }

    public void setCoreSize(int coreSize) {
        this.coreSize = coreSize;
    }

    public int getMaxSize() {
        return maxSize;
    }

    public void setMaxSize(int maxSize) {
        this.maxSize = maxSize;
    }

    public int getKeepAliveSeconds() {
        return keepAliveSeconds;
    }

    public void setKeepAliveSeconds(int keepAliveSeconds) {
        this.keepAliveSeconds = keepAliveSeconds;
    }

    public int getQueueCapacity() {
        return queueCapacity;
    }

    public void setQueueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    public String getThreadNamePrefix() {
        return threadNamePrefix;
    }

    public void setThreadNamePrefix(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }
}

在application.yaml中进行如下配置:

mythead:
  pool:
    thread-name-prefix: myThreadPool-
    core-size: 4
    max-size: 16
    keep-alive-seconds: 60
    queue-capacity: 256

在定义一个线程池,并将它注入到spring中:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class MyThreadPoolFactory {

    @Autowired
    private MyThreadPoolConfig config;

    @Bean("myThreadPool")
    public Executor myThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(config.getCoreSize());
        executor.setMaxPoolSize(config.getMaxSize());
        executor.setQueueCapacity(config.getQueueCapacity());
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 如果没有注入为spring的Bean,需要自己手动去调用 initialize() 方法
        // executor.initialize();
        return executor;
    }
}

调整异步处理服务,里面有两个打印线程的方法,一个使用了我们指定的线程池,一个没有指定:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 异步服务
 *
 * @Author xingo
 * @Date 2023/11/1
 */
@Service
public class AsyncService {

    @Async("myThreadPool")
    public void print() {
        System.out.println(Thread.currentThread().getName() + " work");
    }

    @Async
    public void print1() {
        System.out.println(Thread.currentThread().getName() + " work1");
    }
}

分别调用两个方法,输出内容如下:

myThreadPool-2 work1
myThreadPool-1 work
myThreadPool-3 work
myThreadPool-4 work1
myThreadPool-2 work
myThreadPool-1 work1
...

可以看到这里的两个方法都使用的是我们定义的线程池,并没有用到系统默认的线程池,要想让没有指定线程池名称的@Async方法使用系统默认的线程池,需要添加下面这个类:

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Component
public class DefinedThreadPoolFactory implements AsyncConfigurer {

    @Autowired
    private TaskExecutionProperties config;

    @Override
    public Executor getAsyncExecutor() {
        // 创建线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(config.getPool().getCoreSize());
        executor.setMaxPoolSize(config.getPool().getMaxSize());
        executor.setQueueCapacity(config.getPool().getQueueCapacity());
        executor.setKeepAliveSeconds((int) config.getPool().getKeepAlive().getSeconds());
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 不是作为Bean注入到spring的时候,需要自己手动去调用initialize()方法 不然报错
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                System.out.println(Thread.currentThread().getName() + "|" + method.getName() + "|" + ex);
            }
        };
    }
}

重新调用上面异步处理的方法,通过下面的输出内容可以看到,没有指定的线程池名称的@Async方法使用了系统中定义的线程池:

myThreadPool-1 work
myThread-1 work1
myThreadPool-2 work
myThread-2 work1
myThreadPool-3 work
myThread-3 work1
myThread-4 work1
myThreadPool-4 work
...

下面观察一个现象,把上面的DefinedThreadPoolFactory类删除掉,将MyThreadPoolFactory类调整为下面这样:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class MyThreadPoolFactory {

    @Autowired
    private MyThreadPoolConfig config;

    @Bean("myThreadPool")
    public Executor myThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(config.getCoreSize());
        executor.setMaxPoolSize(config.getMaxSize());
        executor.setQueueCapacity(config.getQueueCapacity());
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    @Autowired
    private TaskExecutionProperties properties;

    @Bean("defaultThreadPool")
    public Executor defaultThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(properties.getPool().getCoreSize());
        executor.setMaxPoolSize(properties.getPool().getMaxSize());
        executor.setQueueCapacity(properties.getPool().getQueueCapacity());
        executor.setKeepAliveSeconds((int) properties.getPool().getKeepAlive().getSeconds());
        executor.setThreadNamePrefix(properties.getThreadNamePrefix());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

也就是将系统线程池也通过Bean的方式注入,这时在调用上面的打印线程名方法,看到输出内容变为下面这样:

SimpleAsyncTaskExecutor-1 work1
myThreadPool-1 work
myThreadPool-2 work
SimpleAsyncTaskExecutor-2 work1
myThreadPool-3 work
SimpleAsyncTaskExecutor-3 work1
myThreadPool-4 work
SimpleAsyncTaskExecutor-4 work1
myThreadPool-1 work
SimpleAsyncTaskExecutor-5 work1
myThreadPool-2 work
SimpleAsyncTaskExecutor-6 work1
myThreadPool-3 work
SimpleAsyncTaskExecutor-7 work1
myThreadPool-4 work
SimpleAsyncTaskExecutor-8 work1
...

指定线程池名的@Async方法还是使用该线程池创建线程,没有指定线程池名的@Async方法使用的是SimpleAsyncTaskExecutor线程池,并没有使用另外的线程池,这里需要注意SimpleAsyncTaskExecutor线程池每提交一个任务就会创建新的线程去执行,这样会极大的影响性能,没有达到线程复用的目的,在开发过程中一定要注意这种问题;其实在@EnableAsync注释中也有说明:

By default, Spring will be searching for an associated thread pool definition: either a unique org.springframework.core.task.TaskExecutor bean in the context, or an java.util.concurrent.Executor bean named “taskExecutor” otherwise. If neither of the two is resolvable, a org.springframework.core.task.SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.

所以,要解决上面出现的问题,我们可以有两种方式:

  1. 在项目中定义一个类实现AsyncConfigurer接口。

  2. 如果没有这个实现类,需要通过Bean的方式注入一个名为taskExecutor的Bean:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    public class MyThreadPoolFactory {
    
        @Autowired
        private MyThreadPoolConfig config;
    
        @Bean("myThreadPool")
        public Executor myThreadPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(config.getCoreSize());
            executor.setMaxPoolSize(config.getMaxSize());
            executor.setQueueCapacity(config.getQueueCapacity());
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            executor.setThreadNamePrefix(config.getThreadNamePrefix());
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    
        @Autowired
        private TaskExecutionProperties properties;
    
        @Bean("taskExecutor")
        public Executor defaultThreadPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(properties.getPool().getCoreSize());
            executor.setMaxPoolSize(properties.getPool().getMaxSize());
            executor.setQueueCapacity(properties.getPool().getQueueCapacity());
            executor.setKeepAliveSeconds((int) properties.getPool().getKeepAlive().getSeconds());
            executor.setThreadNamePrefix(properties.getThreadNamePrefix());
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    }
    

综合上面的内容总结如下:

  1. 使用@Async时需要在启动类中添加@EnableAsync注解开启该功能。
  2. 被@Async标记的方法在调用时需要注意避免在同一个类中不同方法间调用,因为spring是通过代理方式实现的,同一个类中的调用会导致不会创建新的线程执行方法。
  3. 在项目中使用Bean方式自定义了线程池,为了避免未指定线程池的方法能够正常执行,需要在项目中定义一个名为taskExecutor的线程池或者自定义一个类实现AsyncConfigurer接口。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181846.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!