前言
点击查看原文 原文地址
- 第一部分:线程(Thread)与执行体(Executors)
- 第二部分:同步(Synchronization)与锁(Locks)
- 第三部分:原子变量与ConcurrentMap
欢迎浏览Java 8 并发教程的第二部分.本教程致力于使用简单而易于理解的代码实例来教授你关于java8中并发编程一些知识。接下来你会学到在并发编程中使用
synchronized
关键字,锁
,信号
来同步可变的共享变量。Java 并发API于Java5首次加入,在后来发布的版本中不断迭代完善。本文中出现的大部分概念也适合java8以下的版本,不单单针对java8。但是本文将大量使用java8 中的
lambda
表达式以及新的并发功能,如果你对lambda
表达式不是很熟悉的话可以查看这个教程:Java8 教程本文的代码示例中使用了两个帮助函数:
sleep(seconds)
和stop(executor)
public static void stop(ExecutorService executor) {
try {
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("termination interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("killing non-finished tasks");
}
executor.shutdownNow();
}
}
public static void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
AtomicInteger
java.concurrent.atomic
包里面包含了很多非常有用的进行原子操作的类。当一个操作的原子性的,我们就可以在多线程中安全的执行这个操作而不用使用synchronized
关键字或者锁。
原子类内部严重依赖 比较-交换这种指令(compare-and-swap (CAS)),一种被大部分现代CPU直接支持的原子操作指令。这些指令通常比使用锁同步的指令快的多。所以当我们仅仅是想在多线程的情况下修改一个可变变量的值,强烈建议优先使用原子类,而不是锁。
下面让我们一起看一下AtomicInteger
类的几个例子
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(atomicInt::incrementAndGet));
stop(executor);
System.out.println(atomicInt.get()); // => 1000
上例使用AtomicInteger
替代Integer
使的我们可以在不用同步的情况下依然可以线程安全方式同步使一个整形变量数值递增。incrementAndGet()
方法是原子操作的,所以我们可以在多线程的情况下安全的调用这个方法。
AtomicInteger 支持很多种原子操作。updateAndGet()
方法接受一个Lambda表达式,在这个lambda表达式中我们可以对整形变量做各种处理。
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.updateAndGet(n -> n + 2);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 2000
accumulateAndGet()
方法接受一个IntBinaryOperator
类型的Lambda表达式。下面的例子演示了使用此方法并发的将0到1000的数相加。
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.accumulateAndGet(i, (n, m) -> n + m);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 499500
其他比较有用的原子类还有 AtomicBoolean, AtomicLong, AtomicReference
LongAdder
LongAdder
类可以代替AtomicLong
完成并发求和的功能。
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(adder::increment));
stop(executor);
System.out.println(adder.sumThenReset()); // => 1000
类似于原子数字类, LongAdder也 提供了add()
和increment()
方法,而且也是线程安全的。不过他们之间也有所不同,原子数字类是将结果累加到一个变量中,而LongAdder为了降低线程的争用而内部维护了一个变量集合。我们可以通过调用sum()
或者sumThenReset()
方法来获取真实结果。
这个类在更新频率大于读取频率的多线程场景下优于原子数字类。而获取分析数据就属于这种场景,例如你要统计服务器的访问量.LongAdder
类的缺点是占用内存较高,这是由于其在内存中维护了一个变量集合。
LongAccumulator
LongAccumulator 是LongAdder的一种广义版本。LongAccumulator
结合LongBinaryOperator
类型的Lambda表达式使用,如下例所示
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10)
.forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
stop(executor);
System.out.println(accumulator.getThenReset()); // => 2539
我们使用函数 2 * x + y
以及初始值1创建了LongAccumlator. 每次调用accumulate(i)
时,当前结果和i
的值都会作为参数传入lambda表达式中。
与LongAdder
类似,LongAccumlator
在内部也维护了一个变量的集合用来降低线程之间的争用。
ConcurrentMap
ConcurrentMap
接口扩展至map
接口,并且邓毅了很多非常有用的同步集合类型。Java 8 通过向这个接口添加新的方法进入了函数编程。
接下来的代码片段,我们准备使用下面这个简单的map来说明这些新方法:
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
forEach()
方法接受一个BiConsumer
类型的lambda表达式,这个表达式需要连个参数,一个是map的key,一个value。这个方法可以替代同步map中使用entries迭代的 for-each 循环。
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
putIfAbsent()
方法只有当此map中不存在使用给定key获取的value时才压入一个新值。在ConcurrentHashMap
中,这个方法被实现为线程安全的,所以在多线程并发访问下不需要同步。
String value = map.putIfAbsent("c3", "p1");
System.out.println(value); // p0
getOrDefault()
方法当map中存在给定key值的value时就返回那个value,不然就返回默认值
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there
replaceAll()
方法接受一个BiFunction
类型的lambda表达式,这种表达式接受两个参数,返回一个value.
下面的代码将key值为”r2”的value替换成“d3”。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3
如果你只是想改变map中某一个entry
,则可以使用 compute()
方法。这个方法接受两个参数,一个是用于计算的key,另一个是BiFunction
类型的lambda表达式,这个表达式负责转化工作。下面的代码将key是“foo”的value转化成了value+value。
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar
compute()
方法还存在两个变种:computeIfAbsent()
和computeIfPresent()
.
最后介绍一个方法merge()
,这个方法可以将一个新值合并到map中已经存在的value上面去。这个方法接受三个参数,第一个是要合并的entry的key值,第二个是要合并的新值,第三个是一个BiFunction
类型的lambda表达式,这个表达式负责合并规则。例如下面的例子将“boo”合并到了key值为“foo”的value中去了。
map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
System.out.println(map.get("foo")); // boo was foo
ConcurrentHashMap
上面介绍的所有方法都是ConcurrentMap
接口中的方法,所以所有实现了此接口的类都可以使用这些方法。现在我们介绍一个最重要的实现类ConcurrentHashMap
,其中加入了很多关于并发操作的方法。
在Java 8中, 类似于平行流(parallel streams),这些方法通过ForkJoinPool.commonPool()
获取了一个ForkJoinPool
线程池。这个线程池根据你电脑的CPU核数来预设平行参数。例如我们的CPU是4核的,JVM就帮我预设了3平行度。
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
我们可以通过设置来改变着平行度
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
我们使用下面的这个map来演示相关的方法
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
Java 8 引入了3种类型的平行操作:forEach
,search
,reduce
.每一种操作都有四种类型,四种类型区别去参入参数的类型上:key、values、entries、key-value pair.
所有操作的第一个参数都是parallelismThreshold
.这个参数表明启动平行操作的最小集合数量,例如设置为500,只有当要操作的集合里面item的数量大于等于500才启用平行操作,不然都是单线程线性执行。
ForEach
map.forEach(1, (key, value) ->
System.out.printf("key: %s; value: %s; thread: %s\n",
key, value, Thread.currentThread().getName()));
// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
Search
下面的例子演示了search()
方法接受一个BiFunction
类型的lambda表达式,如果有符合条件的值则返回这个值,不然则返回null.
String result = map.search(1, (key, value) -> {
System.out.println(Thread.currentThread().getName());
if ("foo".equals(key)) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar
值的注意的是:当首次达到条件,后面就不会再继续搜索了,而且ConcurrentHashMap
是无序的,所以如果一个map中存在好几个符合搜索条件的值,返回结果不一定是第一个。
Reduce
reduce()
方法接受两个BiFunction
类型的lambda表达式作为参数,下面的例子演示了:第一个lambda表达式将没一个key-value 对连接起来成为一个值,第二个表达式又将第一个表达式连接起来的值再连接到一起。
String result = map.reduce(1,
(key, value) -> {
System.out.println("Transform: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
System.out.println("Reduce: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});
System.out.println("Result: " + result);
// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar
教程的代码托管在GitHub上地址
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/14808.html