Java8 教程

Java8 CompletableFuture 类

java.util.concurrent 库提供了一个 Future<T> 接口,用来表示一个在将来某个时间点可用的、类型为 T 的值。但是,到现在为止,Future 的使用还是相当局限。

Future<T> 接口

在 Java 中,Future<T> 是一个接口,它代表了一个异步计算的结果。这个接口是 Java 并发包(java.util.concurrent)的一部分,主要用于异步编程和并行计算。

Future<T> 接口在 Java 中是用于处理异步计算结果的重要工具。

Future<T> 提供了一种方式来获取异步任务的结果。它允许您启动一个异步操作,然后在稍后的时间点获取操作的结果或者检查操作是否完成。

以下是 Future<T> 接口的一些常见方法:

  • V get() throws InterruptedException, ExecutionException:等待计算完成,然后检索其结果。如果计算尚未完成,则此方法将阻塞。如果计算过程中出现异常,则此方法将抛出 ExecutionException。

  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException:等待计算完成,最多等待指定的时间长度,然后检索其结果。如果计算尚未完成,则此方法将抛出 TimeoutException。

  • boolean cancel(boolean mayInterruptIfRunning):尝试取消计算。如果计算尚未开始,则取消计算并返回 true。如果计算已经开始,则此方法将尝试停止计算;如果 mayInterruptIfRunning 为 true,并且计算正在运行,则线程将收到 InterruptedException。

  • boolean isCancelled():如果计算被取消,则返回 true。

  • boolean isDone():如果计算已完成(正常结束、异常或取消),则返回 true。

示例:假设您有一个耗时的计算任务,您可以将其放在一个单独的线程中执行,并通过 Future 来获取结果。如下:

package com.hxstrive.jdk8.concurrent.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Future 接口
 * @author hxstrive.com
 */
public class FutureDemo1 {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Future<Integer> future = executor.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                // 模拟耗时计算
                Thread.sleep(3000);
                return 1024;
            }
        });

        // 阻塞等待结果
        Integer result = future.get();
        System.out.println("Result: " + result);
        // Result: 1024

        // 关闭线程池
        executor.shutdown();
    }

}

在上述示例中,通过 executor.submit() 提交一个异步任务,并获取一个 Future 对象。然后使用 future.get() 阻塞等待任务的结果。如果我们还有另一个异步任务,用来处理上例耗时任务的结果,如下:

package com.hxstrive.jdk8.concurrent.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Future 接口
 * @author hxstrive.com
 */
public class FutureDemo2 {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        // 异步任务1:耗时任务
        Future<Integer> future = executor.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                // 模拟耗时计算
                Thread.sleep(3000);
                return 1024;
            }
        });
        // 阻塞等待结果
        Integer result = future.get();
        System.out.println("Result: " + result); // Result: 1024

        // 异步任务2:处理耗时任务的结果(必须等待异步任务1执行完成后才能执行)
        Future<Double> future2= executor.submit(new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                return Math.log10(result);
            }
        });
        System.out.println("Result2: " + future2.get());

        // 关闭线程池
        executor.shutdown();
    }

}

发现没有,上面示例代码中,“异步任务2”必须等待“异步任务1”执行完成后才能继续处理。如果我们要实现在获取“异步任务1”结果时不阻塞,当结果可用时,自动执行“异步任务2”,此时,JDK8 新增的 CompletableFuture 类就能排上用了。

CompletableFuture 类

CompletableFuture 是 Java8 引入的一个类,是用于处理异步编程的强大工具,它在 Future 接口的基础上提供了更加丰富和灵活的功能。

CompletableFuture 实现了 Future 和 CompletionStage 接口,它提供了处理异步计算的结果,包括转换、组合和异常处理等功能。

CompletableFuture 包括的主要功能:

  • 异步执行:通过 supplyAsync 方法可以异步地执行任务,并返回一个 CompletableFuture 对象来表示异步操作的结果。

  • 链式调用:CompletableFuture 提供了许多链式调用的方法,如 thenApply、thenAccept、thenCompose 等,用于在异步操作完成后执行后续操作。这些方法都是非阻塞的,它们会立即返回一个新的 CompletableFuture 对象,表示后续操作的结果。

  • 组合操作:CompletableFuture 允许你将多个异步操作组合在一起,通过 thenCombine、thenAcceptBoth、applyToEither 等方法来实现。这些方法允许你等待多个异步操作完成,并对它们的结果进行组合处理。

  • 异常处理:CompletableFuture 提供了 exceptionally 方法来处理异步操作中抛出的异常。你可以指定一个函数来处理异常,并返回一个替代的结果。

  • 结果查询:通过 get 方法可以查询异步操作的结果。如果异步操作尚未完成,get 方法会阻塞当前线程,直到操作完成。此外,还可以使用 join 方法获取结果,但 join 方法不会抛出 ExecutionException 或 InterruptedException。

  • 函数式接口:CompletableFuture 的 API 大量使用了 Java8 的函数式接口,如 Function、Consumer、BiFunction 等,这使得你可以使用 lambda 表达式或方法引用来定义异步操作和后续处理逻辑。

CompletableFuture 类的常见方法:

  • 创建 CompletableFuture:

    • CompletableFuture<Void> runAsync(Runnable runnable):以异步方式执行一个 Runnable 任务,无返回值。

    • CompletableFuture<T> supplyAsync(Supplier<T> supplier):以异步方式执行一个返回结果的 Supplier 任务。

  • 处理结果:

    • thenApply(Function<? super T,? extends U> fn):当异步操作完成时,对结果应用一个函数进行转换,并返回一个新的 CompletableFuture 。

    • thenAccept(Consumer<? super T> action):当异步操作完成时,对结果执行一个消费操作。

    • thenRun(Runnable action):当异步操作完成时,执行一个无输入的后续操作。

  • 异常处理:

    • exceptionally(Function<Throwable,? extends T> fn):当异步操作出现异常时,进行异常处理并返回一个默认值或进行恢复操作。

  • 组合多个 CompletableFuture:

    • thenCompose(Function<? super T,? extends CompletableFuture<U>> fn):将两个异步操作串行连接,前一个操作的结果作为后一个操作的输入。

    • thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):将两个异步操作的结果进行组合处理。

示例:

(1)重写 Futrue 接口示例,使用 CompletableFuture 来实现,如下:

package com.hxstrive.jdk8.concurrent.future;

import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * CompletableFuture 类
 * @author hxstrive.com
 */
public class CompletableFutureDemo1 {

    public static void main(String[] args) throws Exception {
        // 异步任务1:耗时任务
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return 1024;
            }
        });
        System.out.println("future finished.");

        // 异步任务2:处理耗时任务的结果
        CompletableFuture<Double> future2 = future.thenApply(new Function<Integer, Double>() {
            @Override
            public Double apply(Integer result) {
                // 处理异步任务1的结果
                return Math.log10(result);
            }
        });
        System.out.println("future2 finished.");

        // 阻塞,等待所有任务完成
        Double result = future2.get();
        System.out.println("Result = " + result);
        //结果:
        //future finished.
        //future2 finished.
        //Result = 3.010299956639812
    }

}

上述示例,你会发现在“future2.get()”语句前均不会发生阻塞。

(2)使用 CompletableFuture 的 thenCombine() 方法将两个异步操作的结果进行组合处理,如下:

package com.hxstrive.jdk8.concurrent.future;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/**
 * CompletableFuture 类
 * @author hxstrive.com
 */
public class CompletableFutureDemo2 {

    public static void main(String[] args) throws Exception {
        // 异步任务1:耗时任务
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return 1024;
            }
        });
        System.out.println("future finished.");

        // 异步任务2:耗时任务2
        CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(new Supplier<Double>() {
            @Override
            public Double get() {
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return Math.log10(1024);
            }
        });
        System.out.println("future2 finished.");

        // 将“异步任务1”和“异步任务2”的结果进行组合处理
        CompletableFuture<String> future3 = future2.thenCombine(future, new BiFunction<Double, Integer, String>() {
            @Override
            public String apply(Double aDouble, Integer integer) {
                return "future2=" + aDouble + ", future=" + integer;
            }
        });

        // 阻塞,等待所有任务完成
        String result = future3.get();
        System.out.println("Result = " + result);
        //结果:
        //future finished.
        //future2 finished.
        //Result = future2=3.010299956639812, future=1024
    }

}

(3)使用 CompletableFuture 的 thenCompose() 方法将两个异步操作串行连接,前一个操作的结果作为后一个操作的输入。如下:

package com.hxstrive.jdk8.concurrent.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * CompletableFuture 类
 * @author hxstrive.com
 */
public class CompletableFutureDemo3 {

    public static void main(String[] args) throws Exception {
        // 异步任务1:耗时任务
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("future finished.");
                return 1024;
            }
        });

        // 异步任务2:与“异步任务1”组合成串行连接,
        // 即“异步任务1”完成后,开始“异步任务2”
        CompletableFuture<Double> future2 = future.thenCompose(new Function<Integer, CompletionStage<Double>>() {
            @Override
            public CompletionStage<Double> apply(Integer integer) {
                return CompletableFuture.supplyAsync(new Supplier<Double>() {
                    @Override
                    public Double get() {
                        System.out.println("future2 integer=" + integer);
                        return Math.log10(integer);
                    }
                });
            }
        });

        // 阻塞,等待所有任务完成
        Double result = future2.get();
        System.out.println("Result = " + result);
        //结果:
        //future finished.
        //future2 integer=1024
        //Result = 3.010299956639812
    }

}

通过上术的三个实例,是不是觉得 CompletableFuture  非常强大。总之,CompletableFuture 为 Java 中的异步编程提供了更强大、更灵活和更易读的方式,有助于编写高效的并发应用程序。更多关于 CompletableFuture  的知识,可以参考官方API文档 https://docs.oracle.com/javase/8/docs/api/ 去了解。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号