CompletableFuture

CompleteableFuture

CompletableFuture 类实现了 CompletionStage 和 Future 接口,JDK 吸收了 Guava 的设计思想,加入了 Future 的诸多扩展功能形成了 CompletableFuture,CompletableFuture 的基础用法就是当做对于未来对象的包裹使用:

// 无参构造函数简单的创建 CompletableFuture
CompletableFuture<String> completableFuture = new CompletableFuture<String>();

// 使用 CompletableFuture.get() 方法获取结果,该方法会一直阻塞直到 Future 完成
String result = completableFuture.get()

// 设置 Future 完成
completableFuture.complete("Future's Result")

我们也可以手动地监听完成函数:

CompletableFuture completableFuture = new CompletableFuture();

completableFuture.whenComplete(new BiConsumer() {
    @Override
    public void accept(Object o, Object o2) {
        //handle complete
    }
}); // complete the task

completableFuture.complete(new Object())

链式调用与转换

CompletableFuture 还提供了 runAsync/supplyAsync 等静态方法,让我们创建便捷地异步执行流程:

// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();

// 创建一个线程池,并传递给其中一个方法
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // ...
    TimeUnit.SECONDS.sleep(1);
    return "Result of the asynchronous computation";
}, executor);
// CompletionStage 是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());

CompletableFuture 的 then* 方法会生成 CompletionStage 对象,该对象的 then* 方法能够接收 Consumer:

// 获取上一个单元的执行结果,处理和改变 CompletableFuture 的结果
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
	return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
	System.out.println("Got product detail from remote service " + product.getName())
});

thenAccept()可以访问 CompletableFuture 的结果,但 thenRun()不能访 Future 的结果,它持有一个 Runnable 返回 CompletableFuture:

// 对上一步的计算结果不关心,执行下一个操作
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation
}).thenRun(() -> {
    // Computation Finished.
});

组合

使用 thenCompose()组合两个独立的 future, thenCombine() 当两个独立的 Future 都完成的时候,用来做一些事情。

// thenCompose, 独立等待
CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

// thenCombine, 等待全部执行完成
System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    ...
    TimeUnit.SECONDS.sleep(1);
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    ...
    TimeUnit.SECONDS.sleep(1);
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

CompletableFuture.allOf 的使用场景是当你一个列表的独立 future,并且你想在它们都完成后并行的做一些事情。

List<String> webPageLinks = Arrays.asList(...)	// A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个 CompletableFuture 完成的时候,返回一个新的 CompletableFuture。

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

异常处理

如果在原始的 supplyAsync()任务中发生一个错误,这时候没有任何 thenApply 会被调用并且 future 将以一个异常结束。如果在第一个 thenApply 发生错误,这时候第二个和第三个将不会被调用,同样的,future 将以异常结束。

CompletableFuture.supplyAsync(() -> {
	// Code which might throw an exception
	return "Some result";
}).thenApply(result -> {
	return "processed result";
}).thenApply(result -> {
	return "result after further processing";
}).thenAccept(result -> {
	// do something with the final result
});

使用 exceptionally() 回调处理异常 exceptionally()回调给你一个从原始 Future 中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    ...
    throw new IllegalArgumentException("Age can not be negative");
    ...
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

使用 handle() 方法处理异常 API 提供了一个更通用的方法,handle()从异常恢复,无论一个异常是否发生它都会被调用。

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    ...
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

如果异常发生,res 参数将是 null,否则,ex 将是 null。

ListenableFuture

ListenableFuture 是 Guava 对原有 Future 的增强,可以用于监听 Future 任务的执行状况,是执行成功还是执行失败,并提供响应的接口用于对不同结果的处理。

ListenableFuture listenable = service.submit(...);

Futures.addCallback(listenable, new FutureCallback<Object>() {
    @Override
    public void onSuccess(Object o) {}

    @Override
    public void onFailure(Throwable throwable) {}
})

ListenableFuture 适用场景:

  • 如果一个主任务开始执行,然后需要执行各个小任务,并且需要等待返回结果,统一返回给前端,此时 Future 和 ListenableFuture 作用几乎差不多,都是通过 get()方法阻塞等待每个任务执行完毕返回。
  • 如果一个主任务开始执行,然后执行各个小任务,主任务不需要等待每个小任务执行完,不需要每个小任务的结果,此时用 ListenableFuture 非常合适,它提供的 FutureCallBack 接口可以对每个任务的成功或失败单独做出响应。
  • 如果我们希望各个小任务一旦计算完成就拿到结果展示给用户(push 出去)或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。
上一页