在 Java8 中,并行流(Parallel Streams)是一个重要的新特性,它允许开发者以声明性方式在多个线程上并行处理数据。并行流是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这种处理方式可以显著提高数据处理的速度,尤其是在处理大量数据时。
Java8 中的 Stream API 允许开发者通过调用 parallel() 方法将一个串行流转换成并行流,同时也可以通过 sequential() 方法将并行流转回顺序流。并行流内部使用了 Java 的 ForkJoinPool,这是一个为并行计算设计的线程池。默认情况下,并行流的线程数等于处理器的核心数,但也可以通过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来调整。
注意,并行流的处理过程几乎是自动的,但是你需要遵守一些约定。首先,你必须有一个并行流。默认情况下,流操作会创建一个串行流,方法 Collection.parallelStream() 除外。parallel 方法可以将任意的串行流转换为一个并行流。例如:
String[] words = new String[]{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine"}; Stream<String> stream = Stream.of(words); stream.parallel().forEach(e -> { System.out.println(Thread.currentThread().getName() + " : " + e); }); //输出: //main : six //main : five //main : eight //main : nine //main : seven //ForkJoinPool.commonPool-worker-1 : three //ForkJoinPool.commonPool-worker-1 : four //ForkJoinPool.commonPool-worker-1 : one //main : two
只要在终止方法执行时,流处于并行模式,那么所有延迟执行的流操作就都会被并行执行。
当并行运行流操作时,它应当返回与串行运行时相同的结果。很重要的一点是,这些操作都是无状态的,因此可以以任意顺序被执行。
一旦你有了并行流,你就可以使用与顺序流(sequential streams)相同的中间操作和终端操作。请注意,并行流并不总是比顺序流更快,特别是当数据量很小或处理逻辑非常复杂时。此外,并行化可能引入额外的开销,如线程创建和同步。
以下是一个使用并行流进行求和的示例:
package com.hxstrive.jdk8.stream_api.parallel; import java.util.Arrays; import java.util.List; /** * 并行流 * @author hxstrive.com */ public class ParallelDemo4 { public static void main(String[] args) { List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); // 通过并行流求和 int sum = numbers.parallelStream() .mapToInt(Integer::intValue) .sum(); System.out.println("The sum is: " + sum); //输出: //The sum is: 45 } }
并行流使用的一些建议:
尽量避免在并行流中进行状态修改,因为这可能会导致数据竞争和不一致的结果。
当处理的数据类型不是原始类型时,要注意自动装箱和拆箱操作可能会降低性能。Java8 提供了原始类型流(如 IntStream、LongStream、DoubleStream)来避免这种性能损失。
在使用并行流时,要确保所使用的数据结构(如 List、Set、Map)是线程安全的,或者对并行流中的操作进行了适当的同步处理。
java.util.concurrent.ForkJoinPool.common.parallelism 是 Java 并发库中 ForkJoinPool 的一个静态字段,用于指定公共池中可用于并行任务执行的最大线程数。
设置该值
你可以通过调用 ForkJoinPool 类的 setCommonPoolParallelism 方法来设置这个值。这个方法接受一个整数参数,表示新的并行度。你必须在创建公共池之前调用这个方法。
ForkJoinPool.setCommonPoolParallelism(int parallelism);
获取该值
你可以通过调用 ForkJoinPool 类的 getCommonPoolParallelism 方法来获取当前设置的并行度。
int parallelism = ForkJoinPool.getCommonPoolParallelism();