Java7 RecursiveAction 类

RecursiveAction 是 Java 并发包 java.util.concurrent 中的一个抽象类,它继承自 ForkJoinTask,主要用于定义那些不需要返回结果的多线程任务。

RecursiveAction 是 Fork/Join 框架中的一个关键组件,允许开发者将复杂任务分解为更小的子任务,这些子任务可以并行执行,从而提高程序的执行效率。

RecursiveAction 是一个抽象类,它是 ForkJoinTask 的子类:

public abstract class RecursiveAction
extends ForkJoinTask<Void> {
    //...
}

RecursiveAction 的特点

  • 无返回值:与 RecursiveTask 不同,RecursiveAction 不需要返回结果。这使得它非常适合于那些只关注任务执行过程而不关心最终结果的场景。

  • 并行性:RecursiveAction 允许将任务分解为多个子任务,这些子任务可以并行执行,从而充分利用多核处理器的计算能力。

  • 分治策略:通过递归地将任务分解为更小的子任务,直到子任务足够小可以直接执行,RecursiveAction 实现了分治策略。这种策略有助于减少任务的总体执行时间。

  • 工作窃取算法:在 Fork/Join 框架中,RecursiveAction 的执行通常依赖于 ForkJoinPool。ForkJoinPool 使用工作窃取算法来平衡负载,确保所有线程都保持忙碌状态,从而提高资源利用率

RecursiveAction 的使用

要使用 RecursiveAction,通常需要定义一个继承自 RecursiveAction 的类,并实现其抽象方法 compute()。在 compute() 方法中,你需要定义如何分解任务以及如何执行子任务。

用法一:拆分任务

例如,我们通过继承 RecursiveAction 类,来实现一个大批量生成 UUID 的功能,将生成的 UUID 打印出来:

package com.hxstrive.jdk7.forkjoin.recursive_action;

import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * 生成10个UUID
 * @author hxstrive.com
 */
public class RecursiveActionDemo extends RecursiveAction {
    /** 要生成UUID的总数 */
    private int uuidCount;

    public RecursiveActionDemo(int uuidCount) {
        this.uuidCount = uuidCount;
    }

    /**
     * 生成一个随机的UUID字符串
     *
     * @return 返回生成的UUID字符串
     */
    private String getUUID() {
        return UUID.randomUUID().toString();
    }

    /**
     * 重写 compute 方法,用于生成 UUID 并进行输出或递归分解任务
     */
    @Override
    protected void compute() {
        // 如果任务足够小(生成一个UUID),则当前类直接执行,不去创建子任务
        if(uuidCount <= 1) {
            System.out.println(Thread.currentThread().getName() + " UUID:" + getUUID());
        } else {
            // 创建两个子任务,每个任务生成 uuidCount/2 个 UUID
            int half = uuidCount / 2;
            RecursiveActionDemo left = new RecursiveActionDemo(half);
            left.fork(); // 创建子任务并执行

            RecursiveActionDemo right = new RecursiveActionDemo(uuidCount - half);
            right.fork(); // 创建子任务并执行

            // 等待子任务完成
            left.join();
            right.join();
        }
    }

    /**
     * 主函数,程序的入口点。
     */
    public static void main(String[] args) {
        // 创建一个ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 执行任务
        forkJoinPool.invoke(new RecursiveActionDemo(10));
    }

}

运行代码,输出结果如下:

ForkJoinPool-1-worker-8 UUID:f0557b97-1335-44cd-9b2d-64540ea3c5e4
ForkJoinPool-1-worker-1 UUID:e88e2f16-cb6b-4ed8-835b-fe7cc279f209
ForkJoinPool-1-worker-8 UUID:2b83af5a-035f-4814-9e25-3772ee9f6d1c
ForkJoinPool-1-worker-7 UUID:1b85b304-9726-4c7e-928a-ea759dee13c5
ForkJoinPool-1-worker-2 UUID:d45ffa14-b30c-4f14-ac84-ba8e166e50bc
ForkJoinPool-1-worker-1 UUID:df19cdf7-40ce-4cbb-aba5-38d679235fe0
ForkJoinPool-1-worker-4 UUID:58c5885e-ceda-4f6a-a4f4-c892540bff31
ForkJoinPool-1-worker-5 UUID:53c9b939-c030-420b-ad61-29c8110ed633
ForkJoinPool-1-worker-3 UUID:d10628a8-1913-431f-a286-6a0adcbdcf07
ForkJoinPool-1-worker-6 UUID:5735ee67-cb7d-4da5-88e3-2f54f16bbaf8

上面示例非常简单,RecursiveActionDemo 只需将一个虚构的 uuidCount(待生成UUID的总数)作为其构造函数的参数。如果 uuidCount 超过某个阈值(大于1),工作就会被分割成子任务,这些子任务也会被调度执行(通过子任务的 .fork() 方法)。如果 uuidCount 低于某个阈值(小于等于1),则工作由 RecursiveActionDemo 本身执行。

用法二:执行任务,当作线程池

例如,我们有一项工作比较复杂,需要进行很多操作才能最终完成,而且任务中很多子步骤是并行的,而且每个子步骤进行的任务又不相同,这就使得我们不能利用 RecursiveAction 来对任务进行拆分。

通常,我们为了提高性能,可能需要使用线程池来执行任务,如下:

package com.hxstrive.jdk7.forkjoin.recursive_action;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * ExecutorService 线程池用法
 * @author hxstrive.com
 */
public class RecursiveActionDemo1 {

    public static void main(String[] args) {
        try {
            // 创建线程池
            ExecutorService executor = Executors.newFixedThreadPool(2);

            // 创建任务
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("动作1 start");
                }
            });

            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("动作2 start");
                }
            });

            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("动作3 start");
                }
            });

            // 关闭线程池
            executor.shutdown();

            // 等待线程池结束
            while (!executor.isTerminated()) {
                Thread.sleep(10);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

运行代码,输出结果如下:

动作1 start
动作2 start
动作3 start

上述示例将一个大任务的多个子步骤通过线程池并行执行,最后等待所有任务执行完成。当然,我们也可以将线程池做成全局的,然后通过 CountDownLatch 来同步等待所有子步骤的完成。

下面将使用 ForkJoinPool + RecursiveAction 来改造上面例子,达到相同的目的:

package com.hxstrive.jdk7.forkjoin.recursive_action;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * 不拆分任务
 * @author hxstrive.com
 */
public class RecursiveActionDemo2 {

    public static void main(String[] args) {
        try {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            // 动作1
            forkJoinPool.execute(new RecursiveAction() {
                @Override
                protected void compute() {
                    System.out.println("动作1 start");
                }
            });

            // 动作2
            forkJoinPool.execute(new RecursiveAction() {
                @Override
                protected void compute() {
                    System.out.println("动作2 start");
                }
            });

            // 动作3
            forkJoinPool.execute(new RecursiveAction() {
                @Override
                protected void compute() {
                    System.out.println("动作3 start");
                }
            });

            // 关闭 ForkJoinPool 线程池
            forkJoinPool.shutdown();

            // 等待所有任务执行完毕
            while(!forkJoinPool.isTerminated()) {
                Thread.sleep(10);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

运行代码,输出结果如下:

动作1 start
动作3 start
动作2 start

⚠️注意事项

在使用 RecursiveAction 时,需要注意以下几个方面:

正确实现 compute 方法

RecursiveAction 的核心是 compute() 方法,该方法必须被子类正确实现以定义如何分解任务和执行子任务。在 compute() 方法中,应该根据任务的复杂性和大小决定是否需要进一步分解任务。如果任务足够小,可以直接执行;如果任务较大,应该将其分解为更小的子任务。

避免过度拆分

虽然 RecursiveAction 采用分治策略将任务不断拆分成子任务,但如果拆分得过于细粒度,可能会导致过多的任务创建和管理开销。每个任务的创建和调度都需要一定的时间和资源,过度拆分可能会使性能下降。

例如,在处理一个大型数组时,如果将数组拆分成非常小的子数组,可能会导致创建大量的子任务,而这些子任务的管理开销可能会超过并行执行带来的性能提升。

考虑大任务的内存消耗

当处理非常大的任务时,需要注意内存的使用情况。如果任务拆分不当,可能会导致大量的子任务同时存在于内存中,从而耗尽内存资源。

例如,在处理一个巨大的数据集时,如果将数据集一次性加载到内存中并进行拆分,可能会导致内存溢出。在这种情况下,可以考虑采用流式处理或分批次处理的方式,以减少内存的使用。

调整任务大小阈值

在 RecursiveAction 的实现中,通常需要确定一个任务大小的阈值,当任务小于这个阈值时,不再进行拆分而直接执行。这个阈值的选择对性能有很大的影响。

如果阈值设置得过大,可能会导致任务无法充分利用并行性;如果阈值设置得过小,可能会导致过多的任务创建和管理开销。需要通过实验和性能测试来找到一个合适的阈值。

考虑硬件资源

在使用 RecursiveAction 时,需要考虑硬件资源的情况,特别是处理器的核心数量和内存大小。如果硬件资源有限,过度的并行化可能会导致性能下降。

例如,如果处理器核心数量较少,创建过多的子任务可能会导致任务切换和调度的开销增加,从而降低性能。在这种情况下,可以适当减少任务的并行度,以提高性能。

子任务的异常传播

当子任务发生异常时,需要确保异常能够正确地传播到父任务和调用者。如果异常没有被正确处理,可能会导致程序出现不可预期的行为。

例如,可以在 compute() 方法中使用 try-catch 块来捕获子任务的异常,并将异常重新抛出或进行适当的处理。同时,在调用 RecursiveAction 的地方也需要进行异常处理,以确保程序的稳定性。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号