Spring Boot之线程异步调用与线程池的使用
一、异步调用
@EnableAsync开启异步调用
启动类添加@EnableAsync注解
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
创建异步/同步方法
@RestController
public class TestController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private TestService testService;
@GetMapping("async")
public void testAsync() {
long start = System.currentTimeMillis();
logger.info("调用异步方法开始...");
for (int i = 0; i < 2; i++) {
testService.asyncMethod();
}
logger.info("调用异步方法结束...");
long end = System.currentTimeMillis();
logger.info("总耗时:{} ms", end - start);
}
@GetMapping("sync")
public void testSync() {
long start = System.currentTimeMillis();
logger.info("调用同步方法开始...");
for (int i = 0; i < 2; i++) {
testService.syncMethod();
}
logger.info("调用同步方法结束...");
long end = System.currentTimeMillis();
logger.info("总耗时:{} ms", end - start);
}
}
@Async标识异步方法
@Async注解标识方法为异步方法
@Service
public class TestService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Async
public void asyncMethod() {
logger.info("异步方法执行开始...");
sleep();
logger.info("线程名称:{}", Thread.currentThread().getName());
logger.info("异步方法执行结束...");
}
public void syncMethod() {
logger.info("同步方法执行开始...");
sleep();
logger.info("线程名称:{}", Thread.currentThread().getName());
logger.info("同步方法执行结束...");
}
private void sleep() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
异步方法测试
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法开始...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法结束...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 总耗时:5 ms
INFO 11340 --- [ task-2] cn.ybzy.demo.service.TestService : 异步方法执行开始...
INFO 11340 --- [ task-1] cn.ybzy.demo.service.TestService : 异步方法执行开始...
INFO 11340 --- [ task-2] cn.ybzy.demo.service.TestService : 线程名称:task-2
INFO 11340 --- [ task-1] cn.ybzy.demo.service.TestService : 线程名称:task-1
INFO 11340 --- [ task-2] cn.ybzy.demo.service.TestService : 异步方法执行结束...
INFO 11340 --- [ task-1] cn.ybzy.demo.service.TestService : 异步方法执行结束...
同步方法测试
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用同步方法开始...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService : 同步方法执行开始...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService : 线程名称:http-nio-8080-exec-1
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService : 同步方法执行结束...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService : 同步方法执行开始...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService : 线程名称:http-nio-8080-exec-1
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService : 同步方法执行结束...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用同步方法结束...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 总耗时:2023 ms
二、线程池的使用
spring封装了Java的多线程的实现,表现为TaskExecutor接口
public interface TaskExecutor extends Executor {
void execute(Runnable task);
}
两个比较重要的实现:
ThreadPoolTaskExecutor
ThreadPoolTaskExecutor内部对ThreadPoolExecutor进行了包装,同时提供能够通过IOC的形式来配置线程池的各个参数
ThreadPoolTaskScheduler
ThreadPoolTaskScheduler内部对ScheduledThreadPoolExecutor进行了包装,除了能执行异步任务外,还支持定时/延迟任务的执行,属于一种高级特性。
使用默认线程池配置
SpringBoot默认情况下已经自动配置了ThreadPoolTaskExecutor到IOC容器中,需要的时候直接注入使用即可
@SpringBootTest
class SpringbootApplicationTests {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
public void test() {
//向线程池中提交10个任务
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("execute task:" + finalI + " ThreadName: " + Thread.currentThread().getName());
}
});
}
System.out.println("核心线程数:" + threadPoolTaskExecutor.getCorePoolSize());
System.out.println("最大线程数:" + threadPoolTaskExecutor.getMaxPoolSize());
System.out.println("线程等待超时时间:" + threadPoolTaskExecutor.getKeepAliveSeconds());
System.out.println("当前活跃的线程数:" + threadPoolTaskExecutor.getActiveCount());
System.out.println("线程池内线程的名称前缀:" + threadPoolTaskExecutor.getThreadNamePrefix());
}
}
execute task:1 ThreadName: task-2
execute task:2 ThreadName: task-3
execute task:3 ThreadName: task-4
execute task:0 ThreadName: task-1
execute task:4 ThreadName: task-5
execute task:5 ThreadName: task-6
execute task:6 ThreadName: task-7
execute task:7 ThreadName: task-8
execute task:8 ThreadName: task-7
execute task:9 ThreadName: task-6
核心线程数:8
最大线程数:2147483647
线程等待超时时间:60
当前活跃的线程数:0
线程池内线程的名称前缀:task-
对线程池参数配置
ThreadPoolTaskExecutor支持对线程池核心参数的重新配置,从而覆盖SpringBoot默认配置线程池参数
@Configuration
public class AsyncPoolConfig {
@Bean
public ThreadPoolTaskExecutor asyncThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程池核心线程的数量,默认值为1(不能被重用的原因)
executor.setCorePoolSize(16);
// 线程池维护线程的最大数量,只有当核心线程都被用完并且缓冲队列满后,才会开始申超过请核心线程数的线程,默认值为Integer.MAX_VALUE
executor.setMaxPoolSize(60);
// 缓冲队列
executor.setQueueCapacity(30);
// 超出核心线程数外的线程在空闲时候的最大存活时间,默认为60秒
executor.setKeepAliveSeconds(60);
// 设置线程池内线程的名称前缀
executor.setThreadNamePrefix("asyncThread");
// 是否等待所有线程执行完毕才关闭线程池,默认值为false
executor.setWaitForTasksToCompleteOnShutdown(true);
// waitForTasksToCompleteOnShutdown的等待的时长,默认值为0,即不等待
executor.setAwaitTerminationSeconds(60);
// 当没有线程可以被使用时的处理策略(拒绝任务),默认策略为abortPolicy
/**
* callerRunsPolicy:用于被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
* abortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
* discardOldestPolicy:当线程池中的数量等于最大线程数时、抛弃线程池中最后一个要执行的任务,并执行新传入的任务
* discardPolicy:当线程池中的数量等于最大线程数时,不做任何动作
*/
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
executor.setRejectedExecutionHandler(callerRunsPolicy);
//直接初始化
executor.initialize();
return executor;
}
}
execute task:0 ThreadName: asyncThread1
execute task:1 ThreadName: asyncThread2
execute task:2 ThreadName: asyncThread3
execute task:5 ThreadName: asyncThread6
execute task:6 ThreadName: asyncThread7
execute task:3 ThreadName: asyncThread4
execute task:7 ThreadName: asyncThread8
核心线程数:16
execute task:8 ThreadName: asyncThread9
execute task:9 ThreadName: asyncThread10
execute task:4 ThreadName: asyncThread5
最大线程数:60
线程等待超时时间:60
当前活跃的线程数:0
线程池内线程的名称前缀:asyncThread
对异步调用的优化
上述异步调用的异步方法内部会新启一个线程来执行,线程名称为task – 1。默认情况下的异步线程池配置使得线程不能被重用,每次调用异步方法都会新建一个线程,可自定义异步线程池,对线程池参数配置 ,从而进行优化解决.
@Async注解上指定线程池Bean名称
@Async("asyncThreadPoolTaskExecutor")
public void asyncMethod() {
logger.info("异步方法执行开始...");
sleep();
logger.info("线程名称:{}", Thread.currentThread().getName());
logger.info("异步方法执行结束...");
}
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法开始...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法结束...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 总耗时:5 ms
INFO 11340 --- [ task-2] cn.ybzy.demo.service.TestService : 异步方法执行开始...
INFO 11340 --- [ task-1] cn.ybzy.demo.service.TestService : 异步方法执行开始...
INFO 11340 --- [ task-2] cn.ybzy.demo.service.TestService : 线程名称:asyncThread1
INFO 11340 --- [ task-1] cn.ybzy.demo.service.TestService : 线程名称:asyncThread2
INFO 11340 --- [ task-2] cn.ybzy.demo.service.TestService : 异步方法执行结束...
INFO 11340 --- [ task-1] cn.ybzy.demo.service.TestService : 异步方法执行结束...
多线程任务执行耗时统计
@Test
public void test2() throws Exception {
long start = System.currentTimeMillis();
int number=10;
// 线程计数器
CountDownLatch countDownLatch = new CountDownLatch(number);
// 添加10个任务交由线程池执行
for (int i = 0; i < number; i++) {
int finalI = i;
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(finalI*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 线程计数器自减
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName()+" 耗时: "+ (System.currentTimeMillis()-start));
}
});
}
//阻塞当前线程,等待线程池线程返回 latch=0
try {
countDownLatch.await(1,TimeUnit.MINUTES);
long end = System.currentTimeMillis();
System.out.println("任务执行总耗时: "+ (end-start));
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("任务执行超时...");
}
}
threadPoolInfo-1 耗时: 2
threadPoolInfo-2 耗时: 1004
threadPoolInfo-3 耗时: 2010
threadPoolInfo-4 耗时: 3010
threadPoolInfo-5 耗时: 4010
threadPoolInfo-6 耗时: 5009
threadPoolInfo-7 耗时: 6017
threadPoolInfo-8 耗时: 7019
threadPoolInfo-9 耗时: 8014
threadPoolInfo-10 耗时: 9007
任务执行总耗时: 9007
线程池定时任务的使用
schedule() 按cron表达式执行,需注意cron并不能保证在启动时能执行
scheduleAtFixedRate() 按照固定频率,能保证在启动时能执行
scheduleWithFixedDelay() 如果任务时间超过固定频率,按照任务实际时间延后
@SpringBootTest
class SpringbootApplicationTests {
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Test
public void testCron() {
String cron = "*/5 * * * * ?";
threadPoolTaskScheduler.schedule(new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + " schedule...... ");
}
}, new CronTrigger(cron));
while (true) {
}
}
@Test
public void testFixedSchedule() {
ScheduledFuture<?> future = threadPoolTaskScheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + " scheduleAtFixedRate....");
}
}, 1000);
// 中断线程,取消任务
// future.cancel(true);
while (true) {
}
}
}
三、异步回调
异步调用回调
如果异步方法具有返回值的话,需要使用Future来接收回调值。
@Async("asyncThreadPoolTaskExecutor")
public Future<Boolean> asyncMethod() {
logger.info("异步方法执行开始...");
sleep();
logger.info("线程名称:{}", Thread.currentThread().getName());
logger.info("异步方法执行结束...");
AsyncResult<Boolean> asyncResult = new AsyncResult<>(Boolean.TRUE);
return asyncResult;
}
@GetMapping("async")
public void testAsync() throws Exception {
long start = System.currentTimeMillis();
logger.info("调用异步方法开始...");
for (int i = 0; i < 2; i++) {
Future<Boolean> future = testService.asyncMethod();
//Future的get方法为阻塞方法
if (future.get()){
logger.info("调用异步方法成功...");
}else{
logger.info("调用异步方法失败...");
}
// 在设定时间内没有返回值,抛出异常
// Boolean aBoolean = future.get(2, TimeUnit.SECONDS);
}
logger.info("调用异步方法结束...");
long end = System.currentTimeMillis();
logger.info("总耗时:{} ms", end - start);
}
Future的get方法为阻塞方法
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法开始...
INFO 9868 --- [ asyncThread1] cn.ybzy.demo.service.TestService : 异步方法执行开始...
INFO 9868 --- [ asyncThread1] cn.ybzy.demo.service.TestService : 线程名称:asyncThread1
INFO 9868 --- [ asyncThread1] cn.ybzy.demo.service.TestService : 异步方法执行结束...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法成功...
INFO 9868 --- [ asyncThread2] cn.ybzy.demo.service.TestService : 异步方法执行开始...
INFO 9868 --- [ asyncThread2] cn.ybzy.demo.service.TestService : 线程名称:asyncThread2
INFO 9868 --- [ asyncThread2] cn.ybzy.demo.service.TestService : 异步方法执行结束...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法成功...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 调用异步方法结束...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController : 总耗时:2025 ms
多线程任务执行回调
public Boolean addTask() throws Exception {
Future<Boolean> future = threadPoolTaskExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
// TODO 业务操作
return Boolean.TRUE;
}
});
//阻塞5s获取结果
return future.get(5, TimeUnit.SECONDS);
}
@Test
public void test2() throws Exception {
// 添加10个任务交由线程池执行
for (int i = 0; i < 10; i++) {
System.out.println("执行结果: " + this.addTask());
}
}
四、线程池监控
扩展ThreadPoolTaskExecutor
对ThreadPoolTaskExecutor进行扩展,从而实现对线程池的运行状况进行监控,如当有任务提交到线程池中执行的时候自动的打印线程池的运行参数状况
@Slf4j
public class ThreadPoolInfo extends ThreadPoolTaskExecutor {
/**
* 线程池运行状况数据信息
*
* @param info
*/
private void threadPoolLogs(String info) {
//线程名称前缀
String prefix = this.getThreadNamePrefix();
//任务总数
long taskCount = this.getThreadPoolExecutor().getTaskCount();
//已完成的任务数
long completedTaskCount = this.getThreadPoolExecutor().getCompletedTaskCount();
//当前正在执行任务的线程数
int activeCount = this.getThreadPoolExecutor().getActiveCount();
//任务等待队列中任务数
int queueSize = this.getThreadPoolExecutor().getQueue().size();
log.info("{},{},taskCout={},completedTaskCount={},activeCount={},queueSize={}",
prefix, info, taskCount, completedTaskCount, activeCount, queueSize);
}
@Override
public void execute(Runnable task) {
super.execute(task);
threadPoolLogs("execute ");
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Future<T> future = super.submit(task);
threadPoolLogs("submit");
return future;
}
}
扩展线程池配置
配置类中将threadPoolInfo添加到容器中
@Configuration
public class AsyncPoolConfig {
@Bean
public ThreadPoolTaskExecutor threadPoolInfo() {
ThreadPoolTaskExecutor threadPoolInfo = new ThreadPoolInfo();
threadPoolInfo.setCorePoolSize(16);
threadPoolInfo.setMaxPoolSize(60);
threadPoolInfo.setKeepAliveSeconds(60);
threadPoolInfo.setQueueCapacity(30);
threadPoolInfo.setThreadNamePrefix("threadPoolInfo-");
threadPoolInfo.setWaitForTasksToCompleteOnShutdown(true);
threadPoolInfo.setAwaitTerminationSeconds(60);
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
threadPoolInfo.setRejectedExecutionHandler(callerRunsPolicy);
threadPoolInfo.initialize();
return threadPoolInfo;
}
}
使用扩展线程池
明确指定使用threadPoolInfo这个Bean
@SpringBootTest
class SpringbootApplicationTests {
@Qualifier("threadPoolInfo")
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
public void test() {
//向线程池中提交10个任务
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
}
});
}
}
}
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=1,completedTaskCount=0,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=2,completedTaskCount=1,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=3,completedTaskCount=2,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=4,completedTaskCount=3,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=5,completedTaskCount=4,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=6,completedTaskCount=4,activeCount=2,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=7,completedTaskCount=6,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=8,completedTaskCount=6,activeCount=2,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=9,completedTaskCount=8,activeCount=1,queueSize=0
INFO 27752 --- [ main] cn.ybzy.demo.config.ThreadPoolInfo : threadPoolInfo-,execute ,taskCout=10,completedTaskCount=9,activeCount=1,queueSize=0
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137026.html