对于执行耗时的方法,一般业务中会通过Async方式开启新线程执行,这样耗时的程序就不会阻塞当前方法的执行,在开启新线程时,一种方式是通过new Thread().start()的方式开启,还有一种方式是在方法上使用@Async注解方式实现。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.11</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
@Async注解是spring中提供的,要使用它,需要在启动类上通过@EnableAsync开启该功能,并且注意在spring中是通过代理方式实现开启新线程的,所以在同一个类中调用@Async标记的方法不能实现异步效果。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* @Author xingo
* @Date 2023/11/1
*/
@SpringBootApplication
@EnableAsync
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
@Async默认使用线程工厂的配置类是 org.springframework.boot.autoconfigure.task.TaskExecutionProperties ,打开该类内容里面有一个Pool类包含的字段内容如下:
/**
* 队列容量
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* 核心线程数
*/
private int coreSize = 8;
/**
* 最大线程数
*/
private int maxSize = Integer.MAX_VALUE;
队列容量和允许的最大线程数都是int的最大值,可以认为是无界队列,线程数量也没有上限,而核心线程数只有8,也就是说不管我们提交多少任务进来,最多只会有8个线程执行任务,这种配置对于io密集型的互联网项目并不适用,所以我们一般都是会自己按照项目修改配置来满足需求,在 TaskExecutionProperties 配置类上面有 @ConfigurationProperties(“spring.task.execution”) 注解,也就是说可以通过设置application.yaml中的属性来调整参数。
首先设计一个服务类来打印当前执行的线程:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 异步服务
*
* @Author xingo
* @Date 2023/11/1
*/
@Service
public class AsyncService {
@Async
public void print() {
System.out.println(Thread.currentThread().getName() + " work");
}
}
调用上面的方法,在没有设置参数前输出的内容如下:
task-1 work
task-2 work
task-3 work
task-4 work
task-5 work
task-6 work
task-7 work
task-8 work
task-1 work
task-2 work
task-3 work
task-4 work
task-5 work
...
下面在application.yaml中配置如下参数值:
spring:
task:
execution:
thread-name-prefix: myThread-
pool:
queue-capacity: 256
core-size: 16
max-size: 64
allow-core-thread-timeout: false
keep-alive: 60
再次执行上面的程序,打印内容如下:
myThread-1 work
myThread-2 work
myThread-3 work
myThread-4 work
myThread-5 work
myThread-6 work
myThread-7 work
myThread-8 work
myThread-9 work
myThread-10 work
myThread-11 work
myThread-12 work
myThread-13 work
myThread-14 work
myThread-15 work
myThread-16 work
myThread-1 work
myThread-2 work
myThread-3 work
...
通过打印输出可以看到,确实对@Async注解有关线程池的参数产生了影响,这种是全局的参数调整,更多时候我们只想对一部分方法配置单独的线程池进行处理,其他方法再用另外的配置,这时候我们就需要使用自定义线程池进行处理,配置线程池过程如下:
首先定义一个配置类,这个类可以读取application.yaml中的配置内容:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* 自定义线程池配置
*
* @Author xingo
* @Date 2023/11/1
*/
@Configuration
@ConfigurationProperties("mythead.pool")
public class MyThreadPoolConfig {
/**
* 核心线程数
*/
private int coreSize = 8;
/**
* 最大线程数
*/
private int maxSize = 256;
/**
* 线程空闲时间(秒),超过这个时间的线程会被回收
*/
private int keepAliveSeconds = 60;
/**
* 阻塞队列容量
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* 线程名称前缀
*/
private String threadNamePrefix = "myThread-";
public int getCoreSize() {
return coreSize;
}
public void setCoreSize(int coreSize) {
this.coreSize = coreSize;
}
public int getMaxSize() {
return maxSize;
}
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
public int getKeepAliveSeconds() {
return keepAliveSeconds;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public String getThreadNamePrefix() {
return threadNamePrefix;
}
public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
}
在application.yaml中进行如下配置:
mythead:
pool:
thread-name-prefix: myThreadPool-
core-size: 4
max-size: 16
keep-alive-seconds: 60
queue-capacity: 256
在定义一个线程池,并将它注入到spring中:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class MyThreadPoolFactory {
@Autowired
private MyThreadPoolConfig config;
@Bean("myThreadPool")
public Executor myThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(config.getCoreSize());
executor.setMaxPoolSize(config.getMaxSize());
executor.setQueueCapacity(config.getQueueCapacity());
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
executor.setThreadNamePrefix(config.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 如果没有注入为spring的Bean,需要自己手动去调用 initialize() 方法
// executor.initialize();
return executor;
}
}
调整异步处理服务,里面有两个打印线程的方法,一个使用了我们指定的线程池,一个没有指定:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 异步服务
*
* @Author xingo
* @Date 2023/11/1
*/
@Service
public class AsyncService {
@Async("myThreadPool")
public void print() {
System.out.println(Thread.currentThread().getName() + " work");
}
@Async
public void print1() {
System.out.println(Thread.currentThread().getName() + " work1");
}
}
分别调用两个方法,输出内容如下:
myThreadPool-2 work1
myThreadPool-1 work
myThreadPool-3 work
myThreadPool-4 work1
myThreadPool-2 work
myThreadPool-1 work1
...
可以看到这里的两个方法都使用的是我们定义的线程池,并没有用到系统默认的线程池,要想让没有指定线程池名称的@Async方法使用系统默认的线程池,需要添加下面这个类:
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Component
public class DefinedThreadPoolFactory implements AsyncConfigurer {
@Autowired
private TaskExecutionProperties config;
@Override
public Executor getAsyncExecutor() {
// 创建线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(config.getPool().getCoreSize());
executor.setMaxPoolSize(config.getPool().getMaxSize());
executor.setQueueCapacity(config.getPool().getQueueCapacity());
executor.setKeepAliveSeconds((int) config.getPool().getKeepAlive().getSeconds());
executor.setThreadNamePrefix(config.getThreadNamePrefix());
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 不是作为Bean注入到spring的时候,需要自己手动去调用initialize()方法 不然报错
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.out.println(Thread.currentThread().getName() + "|" + method.getName() + "|" + ex);
}
};
}
}
重新调用上面异步处理的方法,通过下面的输出内容可以看到,没有指定的线程池名称的@Async方法使用了系统中定义的线程池:
myThreadPool-1 work
myThread-1 work1
myThreadPool-2 work
myThread-2 work1
myThreadPool-3 work
myThread-3 work1
myThread-4 work1
myThreadPool-4 work
...
下面观察一个现象,把上面的DefinedThreadPoolFactory类删除掉,将MyThreadPoolFactory类调整为下面这样:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class MyThreadPoolFactory {
@Autowired
private MyThreadPoolConfig config;
@Bean("myThreadPool")
public Executor myThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(config.getCoreSize());
executor.setMaxPoolSize(config.getMaxSize());
executor.setQueueCapacity(config.getQueueCapacity());
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
executor.setThreadNamePrefix(config.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
@Autowired
private TaskExecutionProperties properties;
@Bean("defaultThreadPool")
public Executor defaultThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(properties.getPool().getCoreSize());
executor.setMaxPoolSize(properties.getPool().getMaxSize());
executor.setQueueCapacity(properties.getPool().getQueueCapacity());
executor.setKeepAliveSeconds((int) properties.getPool().getKeepAlive().getSeconds());
executor.setThreadNamePrefix(properties.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
也就是将系统线程池也通过Bean的方式注入,这时在调用上面的打印线程名方法,看到输出内容变为下面这样:
SimpleAsyncTaskExecutor-1 work1
myThreadPool-1 work
myThreadPool-2 work
SimpleAsyncTaskExecutor-2 work1
myThreadPool-3 work
SimpleAsyncTaskExecutor-3 work1
myThreadPool-4 work
SimpleAsyncTaskExecutor-4 work1
myThreadPool-1 work
SimpleAsyncTaskExecutor-5 work1
myThreadPool-2 work
SimpleAsyncTaskExecutor-6 work1
myThreadPool-3 work
SimpleAsyncTaskExecutor-7 work1
myThreadPool-4 work
SimpleAsyncTaskExecutor-8 work1
...
指定线程池名的@Async方法还是使用该线程池创建线程,没有指定线程池名的@Async方法使用的是SimpleAsyncTaskExecutor线程池,并没有使用另外的线程池,这里需要注意SimpleAsyncTaskExecutor线程池每提交一个任务就会创建新的线程去执行,这样会极大的影响性能,没有达到线程复用的目的,在开发过程中一定要注意这种问题;其实在@EnableAsync注释中也有说明:
By default, Spring will be searching for an associated thread pool definition: either a unique org.springframework.core.task.TaskExecutor bean in the context, or an java.util.concurrent.Executor bean named “taskExecutor” otherwise. If neither of the two is resolvable, a org.springframework.core.task.SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.
所以,要解决上面出现的问题,我们可以有两种方式:
-
在项目中定义一个类实现AsyncConfigurer接口。
-
如果没有这个实现类,需要通过Bean的方式注入一个名为taskExecutor的Bean:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration public class MyThreadPoolFactory { @Autowired private MyThreadPoolConfig config; @Bean("myThreadPool") public Executor myThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(config.getCoreSize()); executor.setMaxPoolSize(config.getMaxSize()); executor.setQueueCapacity(config.getQueueCapacity()); executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); executor.setThreadNamePrefix(config.getThreadNamePrefix()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } @Autowired private TaskExecutionProperties properties; @Bean("taskExecutor") public Executor defaultThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(properties.getPool().getCoreSize()); executor.setMaxPoolSize(properties.getPool().getMaxSize()); executor.setQueueCapacity(properties.getPool().getQueueCapacity()); executor.setKeepAliveSeconds((int) properties.getPool().getKeepAlive().getSeconds()); executor.setThreadNamePrefix(properties.getThreadNamePrefix()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
综合上面的内容总结如下:
- 使用@Async时需要在启动类中添加@EnableAsync注解开启该功能。
- 被@Async标记的方法在调用时需要注意避免在同一个类中不同方法间调用,因为spring是通过代理方式实现的,同一个类中的调用会导致不会创建新的线程执行方法。
- 在项目中使用Bean方式自定义了线程池,为了避免未指定线程池的方法能够正常执行,需要在项目中定义一个名为taskExecutor的线程池或者自定义一个类实现AsyncConfigurer接口。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181846.html