Spring Boot之线程异步调用与线程池的使用

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 Spring Boot之线程异步调用与线程池的使用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

一、异步调用

@EnableAsync开启异步调用

 启动类添加@EnableAsync注解
@SpringBootApplication
@EnableAsync
public class DemoApplication {

    public static void main(String[] args) {
        
        SpringApplication.run(DemoApplication.class, args);
    }
}

创建异步/同步方法

@RestController
public class TestController {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private TestService testService;
    
    @GetMapping("async")
    public void testAsync() {
        long start = System.currentTimeMillis();
        logger.info("调用异步方法开始...");
        for (int i = 0; i < 2; i++) {
            testService.asyncMethod();
        }
        logger.info("调用异步方法结束...");
        long end = System.currentTimeMillis();
        logger.info("总耗时:{} ms", end - start);
    }

    @GetMapping("sync")
    public void testSync() {
        long start = System.currentTimeMillis();
        logger.info("调用同步方法开始...");
        for (int i = 0; i < 2; i++) {
            testService.syncMethod();
        }
        logger.info("调用同步方法结束...");
        long end = System.currentTimeMillis();
        logger.info("总耗时:{} ms", end - start);
    }

}

@Async标识异步方法

@Async注解标识方法为异步方法
@Service
public class TestService {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Async
    public void asyncMethod() {
        logger.info("异步方法执行开始...");
        sleep();
        logger.info("线程名称:{}", Thread.currentThread().getName());
        logger.info("异步方法执行结束...");
    }

    public void syncMethod() {
        logger.info("同步方法执行开始...");
        sleep();
        logger.info("线程名称:{}", Thread.currentThread().getName());
        logger.info("同步方法执行结束...");
    }

    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

异步方法测试


INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法开始...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法结束...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 总耗时:5 ms
INFO 11340 --- [         task-2] cn.ybzy.demo.service.TestService     : 异步方法执行开始...
INFO 11340 --- [         task-1] cn.ybzy.demo.service.TestService     : 异步方法执行开始...
INFO 11340 --- [         task-2] cn.ybzy.demo.service.TestService     : 线程名称:task-2
INFO 11340 --- [         task-1] cn.ybzy.demo.service.TestService     : 线程名称:task-1
INFO 11340 --- [         task-2] cn.ybzy.demo.service.TestService     : 异步方法执行结束...
INFO 11340 --- [         task-1] cn.ybzy.demo.service.TestService     : 异步方法执行结束...

同步方法测试

INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用同步方法开始...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService     : 同步方法执行开始...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService     : 线程名称:http-nio-8080-exec-1
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService     : 同步方法执行结束...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService     : 同步方法执行开始...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService     : 线程名称:http-nio-8080-exec-1
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.service.TestService     : 同步方法执行结束...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用同步方法结束...
INFO 14904 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 总耗时:2023 ms

二、线程池的使用

spring封装了Java的多线程的实现,表现为TaskExecutor接口

public interface TaskExecutor extends Executor {
	void execute(Runnable task);
}

在这里插入图片描述

两个比较重要的实现:

ThreadPoolTaskExecutor

ThreadPoolTaskExecutor内部对ThreadPoolExecutor进行了包装,同时提供能够通过IOC的形式来配置线程池的各个参数

ThreadPoolTaskScheduler

ThreadPoolTaskScheduler内部对ScheduledThreadPoolExecutor进行了包装,除了能执行异步任务外,还支持定时/延迟任务的执行,属于一种高级特性。

使用默认线程池配置

SpringBoot默认情况下已经自动配置了ThreadPoolTaskExecutor到IOC容器中,需要的时候直接注入使用即可

@SpringBootTest
class SpringbootApplicationTests {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Test
    public void test() {
        //向线程池中提交10个任务
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            threadPoolTaskExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("execute task:" + finalI + "  ThreadName: " + Thread.currentThread().getName());
                }
            });
        }
        System.out.println("核心线程数:" + threadPoolTaskExecutor.getCorePoolSize());
        System.out.println("最大线程数:" + threadPoolTaskExecutor.getMaxPoolSize());
        System.out.println("线程等待超时时间:" + threadPoolTaskExecutor.getKeepAliveSeconds());
        System.out.println("当前活跃的线程数:" + threadPoolTaskExecutor.getActiveCount());
        System.out.println("线程池内线程的名称前缀:" + threadPoolTaskExecutor.getThreadNamePrefix());
    }
}
execute task:1  ThreadName: task-2
execute task:2  ThreadName: task-3
execute task:3  ThreadName: task-4
execute task:0  ThreadName: task-1
execute task:4  ThreadName: task-5
execute task:5  ThreadName: task-6
execute task:6  ThreadName: task-7
execute task:7  ThreadName: task-8
execute task:8  ThreadName: task-7
execute task:9  ThreadName: task-6
核心线程数:8
最大线程数:2147483647
线程等待超时时间:60
当前活跃的线程数:0
线程池内线程的名称前缀:task-

对线程池参数配置

ThreadPoolTaskExecutor支持对线程池核心参数的重新配置,从而覆盖SpringBoot默认配置线程池参数

@Configuration
public class AsyncPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor asyncThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 线程池核心线程的数量,默认值为1(不能被重用的原因)
        executor.setCorePoolSize(16);
        // 线程池维护线程的最大数量,只有当核心线程都被用完并且缓冲队列满后,才会开始申超过请核心线程数的线程,默认值为Integer.MAX_VALUE
        executor.setMaxPoolSize(60);
        // 缓冲队列
        executor.setQueueCapacity(30);
        // 超出核心线程数外的线程在空闲时候的最大存活时间,默认为60秒
        executor.setKeepAliveSeconds(60);
        // 设置线程池内线程的名称前缀
        executor.setThreadNamePrefix("asyncThread");
        // 是否等待所有线程执行完毕才关闭线程池,默认值为false
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // waitForTasksToCompleteOnShutdown的等待的时长,默认值为0,即不等待
        executor.setAwaitTerminationSeconds(60);
        // 当没有线程可以被使用时的处理策略(拒绝任务),默认策略为abortPolicy
        /**
         * callerRunsPolicy:用于被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
         * abortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
         * discardOldestPolicy:当线程池中的数量等于最大线程数时、抛弃线程池中最后一个要执行的任务,并执行新传入的任务
         * discardPolicy:当线程池中的数量等于最大线程数时,不做任何动作
         */
        ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
        executor.setRejectedExecutionHandler(callerRunsPolicy);
		//直接初始化
        executor.initialize();
        return executor;
    }
}
execute task:0  ThreadName: asyncThread1
execute task:1  ThreadName: asyncThread2
execute task:2  ThreadName: asyncThread3
execute task:5  ThreadName: asyncThread6
execute task:6  ThreadName: asyncThread7
execute task:3  ThreadName: asyncThread4
execute task:7  ThreadName: asyncThread8
核心线程数:16
execute task:8  ThreadName: asyncThread9
execute task:9  ThreadName: asyncThread10
execute task:4  ThreadName: asyncThread5
最大线程数:60
线程等待超时时间:60
当前活跃的线程数:0
线程池内线程的名称前缀:asyncThread

对异步调用的优化

上述异步调用的异步方法内部会新启一个线程来执行,线程名称为task – 1。默认情况下的异步线程池配置使得线程不能被重用,每次调用异步方法都会新建一个线程,可自定义异步线程池,对线程池参数配置 ,从而进行优化解决.

@Async注解上指定线程池Bean名称

    @Async("asyncThreadPoolTaskExecutor")
    public void asyncMethod() {
        logger.info("异步方法执行开始...");
        sleep();
        logger.info("线程名称:{}", Thread.currentThread().getName());
        logger.info("异步方法执行结束...");
    }
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法开始...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法结束...
INFO 11340 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 总耗时:5 ms
INFO 11340 --- [         task-2] cn.ybzy.demo.service.TestService     : 异步方法执行开始...
INFO 11340 --- [         task-1] cn.ybzy.demo.service.TestService     : 异步方法执行开始...
INFO 11340 --- [         task-2] cn.ybzy.demo.service.TestService     : 线程名称:asyncThread1
INFO 11340 --- [         task-1] cn.ybzy.demo.service.TestService     : 线程名称:asyncThread2
INFO 11340 --- [         task-2] cn.ybzy.demo.service.TestService     : 异步方法执行结束...
INFO 11340 --- [         task-1] cn.ybzy.demo.service.TestService     : 异步方法执行结束...

多线程任务执行耗时统计

    @Test
    public void test2() throws Exception {
        long start = System.currentTimeMillis();
        int number=10;
        // 线程计数器
        CountDownLatch countDownLatch = new CountDownLatch(number);
        // 添加10个任务交由线程池执行
        for (int i = 0; i < number; i++) {
            int finalI = i;
            threadPoolTaskExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(finalI*1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 线程计数器自减
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName()+" 耗时: "+ (System.currentTimeMillis()-start));
                }
            });
        }

        //阻塞当前线程,等待线程池线程返回 latch=0
        try {
            countDownLatch.await(1,TimeUnit.MINUTES);
            long end = System.currentTimeMillis();
            System.out.println("任务执行总耗时: "+ (end-start));
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("任务执行超时...");
        }
    }
threadPoolInfo-1 耗时: 2
threadPoolInfo-2 耗时: 1004
threadPoolInfo-3 耗时: 2010
threadPoolInfo-4 耗时: 3010
threadPoolInfo-5 耗时: 4010
threadPoolInfo-6 耗时: 5009
threadPoolInfo-7 耗时: 6017
threadPoolInfo-8 耗时: 7019
threadPoolInfo-9 耗时: 8014
threadPoolInfo-10 耗时: 9007
任务执行总耗时: 9007

线程池定时任务的使用

schedule() 按cron表达式执行,需注意cron并不能保证在启动时能执行

scheduleAtFixedRate() 按照固定频率,能保证在启动时能执行

scheduleWithFixedDelay() 如果任务时间超过固定频率,按照任务实际时间延后
@SpringBootTest
class SpringbootApplicationTests {
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Test
    public void testCron() {
        String cron = "*/5 * * * * ?";
        threadPoolTaskScheduler.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + " schedule...... ");
            }
        }, new CronTrigger(cron));
        while (true) {

        }
    }

    @Test
    public void testFixedSchedule() {
        ScheduledFuture<?> future = threadPoolTaskScheduler.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() + " scheduleAtFixedRate....");
            }
        }, 1000);
        // 中断线程,取消任务
        // future.cancel(true);
        while (true) {

        }
    }
}

三、异步回调

异步调用回调

如果异步方法具有返回值的话,需要使用Future来接收回调值。

   @Async("asyncThreadPoolTaskExecutor")
    public Future<Boolean> asyncMethod() {
        logger.info("异步方法执行开始...");
        sleep();
        logger.info("线程名称:{}", Thread.currentThread().getName());
        logger.info("异步方法执行结束...");
        AsyncResult<Boolean> asyncResult = new AsyncResult<>(Boolean.TRUE);
        return asyncResult;
    }
    @GetMapping("async")
    public void testAsync() throws Exception {
        long start = System.currentTimeMillis();
        logger.info("调用异步方法开始...");
        for (int i = 0; i < 2; i++) {
            Future<Boolean> future = testService.asyncMethod();
            //Future的get方法为阻塞方法
            if (future.get()){
                logger.info("调用异步方法成功...");
            }else{
                logger.info("调用异步方法失败...");
            }

            // 在设定时间内没有返回值,抛出异常
            // Boolean aBoolean = future.get(2, TimeUnit.SECONDS);
        }
        logger.info("调用异步方法结束...");
        long end = System.currentTimeMillis();
        logger.info("总耗时:{} ms", end - start);
    }

Future的get方法为阻塞方法

INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法开始...
INFO 9868 --- [   asyncThread1] cn.ybzy.demo.service.TestService     : 异步方法执行开始...
INFO 9868 --- [   asyncThread1] cn.ybzy.demo.service.TestService     : 线程名称:asyncThread1
INFO 9868 --- [   asyncThread1] cn.ybzy.demo.service.TestService     : 异步方法执行结束...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法成功...
INFO 9868 --- [   asyncThread2] cn.ybzy.demo.service.TestService     : 异步方法执行开始...
INFO 9868 --- [   asyncThread2] cn.ybzy.demo.service.TestService     : 线程名称:asyncThread2
INFO 9868 --- [   asyncThread2] cn.ybzy.demo.service.TestService     : 异步方法执行结束...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法成功...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 调用异步方法结束...
INFO 9868 --- [nio-8080-exec-1] cn.ybzy.demo.controller.TestController       : 总耗时:2025 ms

多线程任务执行回调

    public Boolean addTask() throws Exception {
        Future<Boolean> future = threadPoolTaskExecutor.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() {
                // TODO 业务操作
                return Boolean.TRUE;
            }
        });

        //阻塞5s获取结果
        return future.get(5, TimeUnit.SECONDS);
    }

    @Test
    public void test2() throws Exception {
        // 添加10个任务交由线程池执行
        for (int i = 0; i < 10; i++) {
            System.out.println("执行结果: " + this.addTask());
        }
    }

四、线程池监控

扩展ThreadPoolTaskExecutor

对ThreadPoolTaskExecutor进行扩展,从而实现对线程池的运行状况进行监控,如当有任务提交到线程池中执行的时候自动的打印线程池的运行参数状况

@Slf4j
public class ThreadPoolInfo extends ThreadPoolTaskExecutor {

    /**
     * 线程池运行状况数据信息
     *
     * @param info
     */
    private void threadPoolLogs(String info) {
        //线程名称前缀
        String prefix = this.getThreadNamePrefix();
        //任务总数
        long taskCount = this.getThreadPoolExecutor().getTaskCount();
        //已完成的任务数
        long completedTaskCount = this.getThreadPoolExecutor().getCompletedTaskCount();
        //当前正在执行任务的线程数
        int activeCount = this.getThreadPoolExecutor().getActiveCount();
        //任务等待队列中任务数
        int queueSize = this.getThreadPoolExecutor().getQueue().size();
        log.info("{},{},taskCout={},completedTaskCount={},activeCount={},queueSize={}",
                prefix, info, taskCount, completedTaskCount, activeCount, queueSize);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(task);
        threadPoolLogs("execute ");
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Future<T> future = super.submit(task);
        threadPoolLogs("submit");
        return future;
    }
}

扩展线程池配置

配置类中将threadPoolInfo添加到容器中

@Configuration
public class AsyncPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor threadPoolInfo() {
        ThreadPoolTaskExecutor threadPoolInfo = new ThreadPoolInfo();
        threadPoolInfo.setCorePoolSize(16);
        threadPoolInfo.setMaxPoolSize(60);
        threadPoolInfo.setKeepAliveSeconds(60);
        threadPoolInfo.setQueueCapacity(30);
        threadPoolInfo.setThreadNamePrefix("threadPoolInfo-");
        threadPoolInfo.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolInfo.setAwaitTerminationSeconds(60);
        ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
        threadPoolInfo.setRejectedExecutionHandler(callerRunsPolicy);
        threadPoolInfo.initialize();
        return threadPoolInfo;
    }
}

使用扩展线程池

明确指定使用threadPoolInfo这个Bean

@SpringBootTest
class SpringbootApplicationTests {

    @Qualifier("threadPoolInfo")
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Test
    public void test() {
        //向线程池中提交10个任务
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            threadPoolTaskExecutor.execute(new Runnable() {
                @Override
                public void run() {

                }
            });
        }
    }
}
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=1,completedTaskCount=0,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=2,completedTaskCount=1,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=3,completedTaskCount=2,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=4,completedTaskCount=3,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=5,completedTaskCount=4,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=6,completedTaskCount=4,activeCount=2,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=7,completedTaskCount=6,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=8,completedTaskCount=6,activeCount=2,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=9,completedTaskCount=8,activeCount=1,queueSize=0
  INFO 27752 --- [           main] cn.ybzy.demo.config.ThreadPoolInfo     : threadPoolInfo-,execute ,taskCout=10,completedTaskCount=9,activeCount=1,queueSize=0

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137026.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!