多线程之Fork/Join使用

人生之路坎坎坷坷,跌跌撞撞在所难免。但是,不论跌了多少次,你都必须坚强勇敢地站起来。任何时候,无论你面临着生命的何等困惑抑或经受着多少挫折,无论道路多艰难,希望变得如何渺茫,请你不要绝望,再试一次,坚持到底,成功终将属于勇不言败的你。

导读:本篇文章讲解 多线程之Fork/Join使用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

Fork/Join介绍

         Fork/Join框架是Java 7提供的用于并行执行任务的框架。具体是把大任务切分为小任务,再把小任务的结果汇总为大任务的结果。核心思想是工作窃取算法,工作窃取算法是指线程从其他任务队列中窃取任务执行。

如何使用Fork/Join

  • 分割任务:首先需要创建一个ForkJoin任务,执行该类的fork方法可以对任务不断切割,直到分割的子任务足够小
  • 合并任务执行结果:子任务执行的结果同一放在一个队列中,通过启动一个线程从队列中取执行结果。

常见使用场景

  • 大数据计算

简单的实例代码

public class Test {

    private Integer num;

    private String name;

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Test{" +
                "num=" + num +
                ", name='" + name + '\'' +
                '}';
    }
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * 如何使用 forkjoin
 *   1.创建任务类 继承RecursiveTask<返回对象>
 *   2。ForkJoinPool pool = new ForkJoinPool();
 *     forkjoinPool 通过它来执行任务
 *     ForkJoinTask<返回对象> submit = pool.submit(task);
 *     submit.get();获取返回的对象
 * */
public class CountTask extends RecursiveTask<List<Test>> {

    // 临界值
    private static final int THRESHOLD = 10;

    private List<Integer> integers ;

    public CountTask(List<Integer> integers) {
        this.integers = integers;
    }

    @Override
    protected List<Test> compute() {
        List<Test> tests = new ArrayList<>();
        boolean compute = integers.size() <= THRESHOLD;
        if (compute) {
            // 真正执行的任务,分割好的最小任务
            for (int i = 0; i < integers.size(); i++) {
                Test test = new Test();
                test.setName("name"+i);
                test.setNum(i);
                tests.add(test);
            }
            System.out.println("执行方法任务中");
        } else {
            System.out.println("执行拆分任务开始");
            List<List<Integer>> lists = CountTask.averageAssign(integers, 2);
            // 递归
            CountTask task1 = new CountTask(lists.get(0));
            CountTask task2 = new CountTask(lists.get(1));
            // 拆分任务,把任务压入线程队列
            invokeAll(task1, task2);
            //得到小任务的值
            List<Test> task1Res = task1.join();
            List<Test> task2Res = task2.join();
            task1Res.addAll(task2Res);
            tests = task1Res;
            System.out.println("执行任务结束");
        }
        return tests;
    }

    /**
     * 将一组数据平均分成n组
     *
     * @param source 要分组的数据源
     * @param n      平均分成n组
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<>();
        int remainder = source.size() % n;  //(先计算出余数)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value;
            if (remainder > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remainder--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }



    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Integer> integers = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            integers.add(i);
        }
        ForkJoinPool pool = new ForkJoinPool();
        CountTask task = new CountTask(integers);
        ForkJoinTask<List<Test>> submit = pool.submit(task);
        List<Test> tests = submit.get();
        System.out.println("Final result:" + tests);
        System.out.println(tests.size());
        // 关闭线程池
        pool.shutdown();
    }
}

图解

多线程之Fork/Join使用

拓展

Java 8 stream 并行流 底层也是ForkJoin实现

Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。

ForkJoinPool.commonPool() 

使用共享线程池

new ForkJoinPool(num)

它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

创建自己的线程池,所以可以避免共享线程池,如果有需要,甚至可以分配比处理机数量更多的线程

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);

需要特别注意的是:

  1. ForkJoinPool 使用submit 或 invoke 提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。
  2. 这里继承的是RecursiveTask,还可以继承RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景
  3. 这一点是最容易忽略的地方,其实这里执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法。
eftTask.fork();  
rightTask.fork();

替换为

invokeAll(leftTask, rightTask);

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

        首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

注意在多线程环境下 对集合进行操作 线程安全问题 

java.util.ConcurrentModificationException

可以使用线程安全的集合修饰

参考连接

高并发之Fork/Join框架使用及注意事项

Fork/Join框架原理及应用

Java8 parallelStream —— 替换默认的共享线程池ForkJoinPool.commonPool()

介绍 ForkJoinPool 的适用场景,实现原理

ForkJoinPool的使用

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/133922.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!