Java7 RecursiveTask 类

RecursiveTask 是 Java 中的一个抽象类,它是 ForkJoinTask 的子类:

public abstract class RecursiveTask<V>
extends ForkJoinTask<V> {
    //...
}

递归任务(RecursiveTask)用于表示有返回结果的递归分治任务。它可以将工作分割成更小的任务,并将这些小任务的结果合并成一个集体结果。拆分和合并可以在多个层面上进行。

RecursiveTask 的特点

支持返回结果

与 RecursiveAction 不同,RecursiveTask 的任务执行后会返回一个结果。这使得它非常适合用于需要进行计算并返回结果的任务,例如数值计算、数据查询等。

例如,计算一个数组中所有元素的和,可以将数组分成多个子数组,每个子数组作为一个子任务进行计算,最后将子任务的结果合并得到最终的和。

递归分治策略

同样遵循分治策略,将一个大任务分解为较小的子任务,然后递归地处理这些子任务,直到任务足够小可以直接处理。这种策略可以充分利用多核处理器的并行处理能力,提高任务的执行效率。

例如,在进行大规模数据排序时,可以将数据分成多个子数据集,每个子数据集作为一个子任务进行排序,最后将子任务的结果合并得到最终的排序结果。

RecursiveTask 主要方法

  • compute():这是一个抽象方法,需要在子类中被实现。它定义了任务的实际计算逻辑,包括任务的分解和合并。在 compute() 方法中,开发者需要决定任务是应该继续递归分解还是已经足够小可以直接计算。

  • fork():继承自 ForkJoinTask 的方法,用于在 ForkJoinPool 中异步执行当前任务。调用 fork() 后,当前任务会被安排到某个工作线程上执行,并且 fork() 会立即返回,允许调用者继续执行其他任务。

  • join():继承自 ForkJoinTask 的方法,用于等待当前任务完成并返回其结果。如果在一个任务中调用了另一个任务的fork(),然后需要等待那个任务完成并获取其结果,就会使用 join() 方法。

RecursiveTask 的使用

要使用 RecursiveTask,你需要定义一个继承自 RecursiveTask<V> 的子类,其中 V 是任务执行完成后返回的结果类型。在这个子类中,你需要实现 compute() 方法,该方法定义了任务的执行逻辑,包括任务的分解和合并。

示例:计算斐波那契数

斐波那契数列(Fibonacci sequence),又称黄金分割数列,因数学家莱昂纳多·斐波那契(Leonardo Fibonacci)以兔子繁殖为例子而引入,故又称为“兔子数列”。

斐波那契数列指的是这样一个数列:0、1、1、2、3、5、8、13、21、34、...。

即从第三项开始,每一项都等于前两项之和,呈现出一种特殊的增长模式。

斐波那契数列在自然界中广泛存在,比如植物的叶序、花朵的花瓣数、蜂巢的结构等都可能遵循斐波那契数列的规律。

实现代码:

package com.hxstrive.jdk7.forkjoin.recursive_task;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * RecursiveTask 示例:计算斐波那契数列
 * @author hxstrive.com
 */
public class RecursiveTaskDemo {

    static class Fibonacci extends RecursiveTask<Integer> {
        final int n;

        public Fibonacci(int n) { this.n = n; }

        // 0、1、1、2、3、5、8、13、21、34、...
        // 第三项开始,每一项都等于前两项之和
        @Override
        public Integer compute() {
            if (n <= 1) {
                return n;
            }

            // 前第一项
            Fibonacci f1 = new Fibonacci(n - 1);
            f1.fork();

            // 前第二项
            Fibonacci f2 = new Fibonacci(n - 2);

            // 等待前两项计算完成,求和
            return f2.compute() + f1.join();
        }
    }

    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool();
        // 创建计算任务
        Fibonacci task = new Fibonacci(10);
        // 提交任务并获取结果
        ForkJoinTask<Integer> future = pool.submit(task);
        // 获取结果,注意:get() 是一个阻塞方法,会等待任务执行完成
        System.out.println("fib(10) = " + future.get());
        //结果:
        //fib(10) = 55
    }

}

示例:求和

给定一个整形数组,利用 RecursiveTask 实现对数组求和:

package com.hxstrive.jdk7.forkjoin.recursive_task;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * 数组求和
 * @author hxstrive.com
 */
public class RecursiveTaskDemo2 {

    static class SumTask extends RecursiveTask<Integer> {
        /** 待求和的数组 */
        private int[] array;
        /** 数组求和,开始下标位置 */
        private int start;
        /** 数组求和,结束下表位置 */
        private int end;

        public SumTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            if (end - start <= 10) {
                // 任务足够小,直接计算
                int sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            } else {
                // 任务较大,分解为两个子任务
                int mid = start + (end - start) / 2;

                // 创建两个子任务
                SumTask leftTask = new SumTask(array, start, mid);
                SumTask rightTask = new SumTask(array, mid, end);

                // 执行子任务
                leftTask.fork();

                // 等待子任务执行完毕
                return rightTask.compute() + leftTask.join();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        // 创建 ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 创建任务
        SumTask sumTask = new SumTask(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 0, 10);
        // 提交任务,并获取结果
        ForkJoinTask<Integer> task = forkJoinPool.submit(sumTask);
        // 等待任务执行完毕
        System.out.println("result: " + task.get());
    }

}

运行结果如下:

result: 55

SumTask 的 compute() 方法说明:

如果 end - start <= 10,即当前任务的范围足够小(这里假设为10个元素或更少),则直接在这个范围内遍历数组并计算和。这是递归的基准情况,防止无限递归。

如果任务范围较大(超过10个),元素则将其分解为两个子任务:

  • 计算中间索引mid,将数组从中间分开。

  • 创建两个新的 SumTask 实例,分别处理左半部分和右半部分。

  • 使用 fork() 方法异步执行左子任务。这意味着左子任务将在单独的线程中执行,而当前线程(主任务)将继续执行。

然后,当前线程同步执行右子任务(通过调用 rightTask.compute())。由于 compute() 方法是递归的,如果右子任务的范围仍然很大,它也会进一步分解为更小的子任务。

最后,使用 join() 方法等待左子任务完成,并获取其返回的结果。然后,将左右两个子任务的结果相加,并返回这个和作为当前任务的最终结果。

⚠️注意事项

以下是 RecursiveTask 的注意事项:

子任务独立性 - 无共享状态或相互依赖

RecursiveTask 要求子任务之间不能有共享状态或相互依赖。每个子任务应该是独立的,可以独立地完成计算并返回结果。如果任务之间存在共享状态或相互依赖,可能会导致竞态条件、死锁或其他并发问题。

任务必须是可并行化的,即可以分解为多个独立的子任务并行执行。如果任务本身不可并行化,使用 RecursiveTask 可能不会带来性能提升,甚至可能由于线程管理开销而降低性能。

任务分解与合并

在任务分解时,需要合理设置分解的阈值。阈值设置得过高,可能导致任务无法充分并行化;阈值设置得过低,则可能引入过多的线程管理开销。因此,需要根据任务的具体情况和系统的处理能力来设置合适的阈值。

在子任务执行完成后,需要正确合并子任务的结果以得到最终结果。在 RecursiveTask 中,这通常通过 join() 方法来实现,它会阻塞当前线程直到子任务完成并返回结果。

异常处理

在 RecursiveTask 的实现中,需要妥善处理可能出现的异常。由于 RecursiveTask 是并行执行的,因此异常处理需要特别注意。一般来说,可以在 compute() 方法中捕获并处理异常,或者将异常信息传递给上层调用者。

避免过度分解

虽然递归分解可以提高处理效率,但过度分解可能会导致线程管理开销过大,从而降低性能。因此,需要根据实际情况合理控制任务分解的深度。

RecursiveTask 是通过 ForkJoinPool 来执行的,ForkJoinPool 善于利用窃取工作执行来加快任务的总体执行速度。因此,在使用 RecursiveTask 时,应该充分利用 ForkJoinPool 的这一优势来提高性能

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号