Java8 教程

Java8 并行流

在 Java8 中,并行流(Parallel Streams)是一个重要的新特性,它允许开发者以声明性方式在多个线程上并行处理数据。并行流是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这种处理方式可以显著提高数据处理的速度,尤其是在处理大量数据时。

创建并行流

Java8 中的 Stream API 允许开发者通过调用 parallel() 方法将一个串行流转换成并行流,同时也可以通过 sequential() 方法将并行流转回顺序流。并行流内部使用了 Java 的 ForkJoinPool,这是一个为并行计算设计的线程池。默认情况下,并行流的线程数等于处理器的核心数,但也可以通过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来调整。

注意,并行流的处理过程几乎是自动的,但是你需要遵守一些约定。首先,你必须有一个并行流。默认情况下,流操作会创建一个串行流,方法 Collection.parallelStream() 除外。parallel 方法可以将任意的串行流转换为一个并行流。例如:

String[] words = new String[]{"one", "two", "three", "four", "five",
               "six", "seven", "eight", "nine"};
Stream<String> stream = Stream.of(words);
stream.parallel().forEach(e -> {
   System.out.println(Thread.currentThread().getName() + " : " + e);
});
//输出:
//main : six
//main : five
//main : eight
//main : nine
//main : seven
//ForkJoinPool.commonPool-worker-1 : three
//ForkJoinPool.commonPool-worker-1 : four
//ForkJoinPool.commonPool-worker-1 : one
//main : two

只要在终止方法执行时,流处于并行模式,那么所有延迟执行的流操作就都会被并行执行。

当并行运行流操作时,它应当返回与串行运行时相同的结果。很重要的一点是,这些操作都是无状态的,因此可以以任意顺序被执行。

使用并行流

一旦你有了并行流,你就可以使用与顺序流(sequential streams)相同的中间操作和终端操作。请注意,并行流并不总是比顺序流更快,特别是当数据量很小或处理逻辑非常复杂时。此外,并行化可能引入额外的开销,如线程创建和同步。

以下是一个使用并行流进行求和的示例:

package com.hxstrive.jdk8.stream_api.parallel;

import java.util.Arrays;
import java.util.List;

/**
 * 并行流
 * @author hxstrive.com
 */
public class ParallelDemo4 {

    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        // 通过并行流求和
        int sum = numbers.parallelStream()
                .mapToInt(Integer::intValue)
                .sum();
        System.out.println("The sum is: " + sum);
        //输出:
        //The sum is: 45
    }

}

并行流使用的一些建议:

  • 尽量避免在并行流中进行状态修改,因为这可能会导致数据竞争和不一致的结果。

  • 当处理的数据类型不是原始类型时,要注意自动装箱和拆箱操作可能会降低性能。Java8 提供了原始类型流(如 IntStream、LongStream、DoubleStream)来避免这种性能损失。

  • 在使用并行流时,要确保所使用的数据结构(如 List、Set、Map)是线程安全的,或者对并行流中的操作进行了适当的同步处理。

java.util.concurrent.ForkJoinPool.common.parallelism 介绍

java.util.concurrent.ForkJoinPool.common.parallelism 是 Java 并发库中 ForkJoinPool 的一个静态字段,用于指定公共池中可用于并行任务执行的最大线程数。

  • 设置该值

你可以通过调用 ForkJoinPool 类的 setCommonPoolParallelism 方法来设置这个值。这个方法接受一个整数参数,表示新的并行度。你必须在创建公共池之前调用这个方法。

ForkJoinPool.setCommonPoolParallelism(int parallelism);
  • 获取该值

你可以通过调用 ForkJoinPool 类的 getCommonPoolParallelism 方法来获取当前设置的并行度。

int parallelism = ForkJoinPool.getCommonPoolParallelism();
说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号