线程池最优使用策略【Java线程池学习一】


  • 一、前言

  • 二、理论

    • 2-1、创建线程池的方式

    • 2-2、核心参数(重点)

  • 三、实践

    • 3-1、场景一

    • 3-2、场景二

    • 3-3、场景三



本篇博客的主要目的是指导如何在Java中优雅的使用线程池。这篇博客的内容是截止一周前我对线程池的理解,简单说就是工作了三年的人对线程池的理解。


一、前言

初学者对于多线程会有一种莫名的恐惧,会害怕出错,但其实多线程没想象中的那么复杂,主要两点:

  1. 对于线程的参数熟记于心(刚开始记不住很正常也没啥,收藏文章看几次就好了)
  2. 使用多线程的时候要注意 全局变量 和 异常处理

对我以往的面试来说(一年多工作经验的时候),掌握下面的理论知识就很充足了。


二、理论

2-1、创建线程池的方式

在实际的工作中我们一般有两种创建线程池的方式:


方式一:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1,21, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));


方式二:不推荐、不推荐, 了解即可

// 创建一个单线程的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建一个定时执行的线程池
ScheduledExecutorService executorService1 = Executors.newScheduledThreadPool(1);
// 创建一个可缓存的线程池
ExecutorService executorService2 = Executors.newCachedThreadPool();
// 创建一个定长的线程池
ExecutorService executorService3 = Executors.newFixedThreadPool(1);


方式二点进各个方法,其实底层还是调用的 new ThreadPoolExecutor() 只是传递的参数不一致罢了。


2-2、核心参数(重点)

new ThreadPoolExecutor(1,21, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

创建线程池的时候会传递一堆参数,当你理解了这些参数再来使用线程池将畅通无阻。


ThreadPoolExecutor 的构造方法好有几个重载的,直接来看参数最全的一个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
 
{
                               
    // ...                                 
}
参数名称 含义
corePoolSize(核心线程数量) 线程池里面的常驻线程数
maximumPoolSize(最大线程数量) 当核心线程数处理不完,并且阻塞队列满了之后,就会开启新的线程来执行
unit(超时回收线程时间的单位) 超时时间的单位,配合上面使用
workQueue(阻塞队列) 当任务超过了核心线程能处理的范围后,新的任务会被放进阻塞队列  

常见的队列:  

1. ArrayBlockingQueue 基于数组实现的一个有界队列
2. LinkedBlockingQueue 基于链表实现的无界队列(使用的时候要设置大小)
3. SynchronousQueue 一个特殊的队列,不存储数据,一个任务进来直接给到消费者,它有公平模式(先进先出),和非公平模式(先进后出)
4. PriorityBlockingQueue 无界可扩容可排序的队列
5. DelayQueue 里面的元素必须实现Delayed接口,重写里面的getDelay、compareTo方法。

threadFactory(线程工厂)用来创建线程的工厂
handler(拒绝策略)当阻塞队列满了之后,并且线程数达到了最大线程数,这时候如果还有任务进来,就会被拒绝。

常见的拒绝策略

1. AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。(默认的策略)
2. DiscardPolicy:也是丢弃任务,但是不抛出异常。
3. DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
4. CallerRunsPolicy:由调用线程处理该任务


注:

  1. 使用阻塞队列的时候要注意 LinkedBlockingQueue 是基于链表实现的一个无界队列,在使用的时候一定要设置一个大小,不然如果任务生产过快,会导致内存溢出。
  2. 超出核心线程数的任务,先进队列再开新线程(最大线程数)。
  3. 如果我们可以接受消费慢,其实我们可以考虑把拒绝策略设置成 CallerRunsPolicy (由调用线程处理该任务)


三、实践


3-1、场景一

有一个任务列表,每次最少选择一个任务,最多选择10个任务,绝大部分的时候是选择一个任务的,平均每个任务执行需要 0.3s。

最开始我们接收前端的参数是一个 List集合,每次从循环去处理数据,绝大部分的时候很快,但偶尔用户选择多个的时候就很慢了。

然后我们进行优化,使用多线程去处理,伪代码如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1100, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

public String fun(List<String> ids) throws InterruptedException {
    // 计数器,我们需要在多线程处理完毕后再来对结果进行汇总返回
    CountDownLatch countDownLatch = new CountDownLatch(ids.size());
    // 用来存放多线程的结果
    LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
    
    // 循环处理结果
    for (String id : ids) {
        executor.execute(() -> {
            
            // 业务逻辑处理, 并把处理的结果放入阻塞队列
            queue.add(funTest(id));

            countDownLatch.countDown();
        });
    }
    
    // 等待线程都执行完
    countDownLatch.await();
    
    // 处理全部的返回值 queue
    // ...
    
    return "ok";
}


3-2、场景二

一个查询列表接口,但因为前期表结构设计的问题,现在需要去三张表里面获取数据,然后在把结果按照时间排序返回。

这个场景侧重点对于每个表的数据处理,最终返回给前端的是一个List,但是表数据是不一样的,所以我们查询到数据后需要进行特殊处理,伪代码如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(130, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

public String fun() throws ExecutionException, InterruptedException {

    // 不同的任务去处理
    Future<Object> resultOne = executor.submit(funOne(), Object.class);
    Future<Object> resultTwo = executor.submit(funTwo(), Object.class);
    Future<Object> resultThree = executor.submit(funThree(), Object.class);

    // get 方法会阻塞直到返回结果
    Object one = resultOne.get();
    Object two = resultTwo.get();
    Object three = resultThree.get();

    // 数据集合组合返回
    // ...
    
    return "ok";
}


3-3、场景三

某个接口提供了查询功能,但数据量有100w需要按每页1w来查询,并把数据插入数据库。

上面两个的伪代码主要是基于业务的实现,这个代码我们着重来看一下使用多线程需要注意那些因素。

/**
 * 100w每次1w 只需要10次即可,但考虑到后续数据量的一个增长,我们这里还是设置了最大线程数为20,阻塞队列是10,这样做多可以容纳300w的数据,还超过的话会让当前线程直执行
 */

ThreadPoolExecutor executor = new ThreadPoolExecutor(5200, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());

public String fun() {
    
    final int pageSize = 10000;
    BaseResult tmpBaseResult = baseRequest(1, pageSize);
    if (tmpBaseResult.code != 200) {
        return "fail";
    }
    
    CountDownLatch countDownLatch = new CountDownLatch(tmpBaseResult.totalCount);
    for (int i = 1;i <= tmpBaseResult.totalCount; i++) {
        Integer page = new Integer(i);
        
        executor.execute(()->{
            try {
                BaseResult baseResult = baseRequest(page, pageSize);
                if (tmpBaseResult.code == 200) {
                    try {
                        baseMapper.inserts(baseResult.lists);
                    }catch (Exception e) {
                        log.error("数据处理异常:数据库异常:{}", e);
                    }
                }
            }catch (Exception e) {
                log.error("数据处理异常:多线程处理异常:{}", e);
            }finally {
                countDownLatch.countDown();
            }
        });
    }
    
    // 等待线程都执行完
    countDownLatch.await();
    
    log.info("执行成功");
    return "ok";
}

private BaseResult baseRequest(int page, int pageSize) {
    BaseResult baseResult = new BaseResult();

    try {
        Map map = http.execute(page, pageSize);
        if (map.get("code") != 200) {
            baseResult.code = -1;
            log.error("数据处理异常:接口返回异常:{}", e);
            return baseResult;
        }
        baseResult.code = 200;
        baseResult.totalCount = map.get("allCount") % pageSize == 0 ? map.get("allCount") / pageSize : map.get("allCount") / pageSize + 1;
        baseResult.lists = (List) map.get("data");
    }catch (Exception e) {
        baseResult.code = -1;
        log.error("数据处理异常:请求接口异常:{}", e);
    }
    
    return baseResult;
}

public class BaseResult {

    /**
     * 200 正常
     */

    private Integer code;

    /**
     * 总条数
     */

    private Integer totalCount;

    /**
     * 数据结果
     */

    private List<Object> lists;

}


注:

  1. 一定要注意全局变量的使用,尽可能的不要使用全局变量
  2. 不要害怕代码多,一定要做好异常处理,不然多线程出了问题你都不知道


原文始发于微信公众号(小道仙97):线程池最优使用策略【Java线程池学习一】

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/204485.html

(0)
java面试题的头像java面试题

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!