10.3 通用响应式操作实战

10.3 通用响应式操作实战

Flux 和 Mono 是 Reactor 提供的最重要的组成部分,而这两个响应式类型所提供的操作就是粘合剂,这些操作将它们结合在一起,来创建数据流动的通道。在 Flux 和 Mono 之间,存在超过 500 种操作,其中的每一个可以被归类为:

  • 创建操作
  • 联合操作
  • 传输操作
  • 逻辑处理操作

把 500 个操作都印在这里来看看它们是如何工作的,这是件有趣的事情,但是在这一章节中没有这么大的空间给它。因此我在这个章节中选择了几个最有用的操作,我们先从创建操作开始。

注意:

哪里有 Mono 的例子吗?Mono 和 Flux 有很多相同的操作,所以它没有必要展示两次同样的操作。此外,Mono 虽然是有用的,但是对比 Flux 的操作来说,Mono 看上去还是没有那么有趣。在我们所写的例子中,大多数使用的都是 Flux,Mono 通常情况下有与 Flux 对等的操作。

10.3.1 创建响应式类型

当时长使用 Spring 中的响应式类型时,会从 respository 或是 service 中得到 Flux 或是 Mono,因此需要你自己创建一个。但是有时候你需要创建一个新的响应式发布者。

Reactor 为创建 Flux 和 Mono 提供了多个操作。在本节中,我们将介绍一些最有用的创建操作。

从对象进行创建

如果你想从 Flux 或是 Mono 创建一个或多个对象,你可以 Flux 或 Mono 中的静态方法 just() 去创建一个响应式类型,其中的数据由这些对象驱动。例如,下面这个测试方法就是使用 5 个 String 对象来创建一个 Flux:

@Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux
        .just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}

这样就创建了一个 Flux,但它没有订阅者。要是没有订阅者,数据不会流动。以花园软管的思路进行类比,你已经把软管接到出水口了,另一端就是从自来水公司流出的水。但是水不会流动,除非你打开水龙头。对响应式类型的订阅就是打开数据流的方式。

要添加一个订阅者,可以调用 Flux 中的 subscribe() 方法:

fruitFlux.subscribe(
    f -> System.out.println("Here's some fruit: " + f);
);

subscribe() 中的 lambda 表达式实际上是 java.util.Consumer,用于创建响应式流的 Subscriber。由于调用了 subscribe() 方法,数据开始流动了。在这个例子中,不存在中间操作,因此数据直接从 Flux 流到了 Subscriber。

为了在运行过程中观察响应式类型,一个好方法就是将 Flux 或 Mono 打印到控制台里面。但是,测试 Flux 或 Mono 更好的方式是使用 Reactor 中的 StepVerifier。给定一个 Flux 或 Mono,StepVerifier 订阅这个响应式类型,然后对流中流动的数据应用断言,最后验证流以预期方式完成。

例如,为了验证规定的数据流经 fruitFlux,你可以写一个测试,如下所示:

StepVerifier.create(fruitFlux)
    .expectNext("Apple")
    .expectNext("Orange")
    .expectNext("Grape")
    .expectNext("Banana")
    .expectNext("Strawberry")
    .verifyComplete();

这个例子中,StepVerifier 订阅了 Flux,然后对每一个匹配到的期望的水果名字做断言。最后,它验证了 Strawberry 是由 Flux 生成的,对 Flux 的验证完毕。

在本章余下的示例中,将使用 StepVerifier 编写测试用例以验证某些行为,并帮助你了解某些操作是如何工作的,从而了解一些 Reactor 最有用的操作。

从集合创建

Flux 也可从任何的集合创建,如 Iterable 或是 Java Stream。图 10.3 使用弹珠图绘制了这是如何运行的:

图 10.3 Flux 可由数组、Iterable 或是 Stream 创建

![Acrobat_EgcrkessnX](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Acrobat_EgcrkessnX.png)

为了从数组创建一个 Flux,调用静态方法 fromArray(),然后将数组作为数据源传入:

@Test
public void createAFlux_fromArray() {
    String[] fruits = new String[] {
        "Apple", "Orange", "Grape", "Banana", "Strawberry" };

    Flux<String> fruitFlux = Flux.fromArray(fruits);
    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
}

因为当你从对象列表中创建 Flux 的时候,源数组包含了你使用到的相同的水果名称,所以被 Flux 所命中的数据有相同的值。这样一来,你就在验证这个 Flux 之前使用相同的 StepVerifier。

如果你需要从 java.util.List、java.util.Set 或任何实现了 java.lang.Iterable 接口的类创建 Flux,你可以将它传入静态方法 fromIterable() 中:

@Test
public void createAFlux_fromIterable() {
    List<String> fruitList = new ArrayList<>();
    fruitList.add("Apple");
    fruitList.add("Orange");
    fruitList.add("Grape");
    fruitList.add("Banana");
    fruitList.add("Strawberry");
    Flux<String> fruitFlux = Flux.fromIterable(fruitList);
    // ... verify steps
}

或是,如果你突然想要把你用得顺手的 Java Stream 作为 Flux 的源,你将会用到 fromStream() 方法:

@Test
public void createAFlux_fromStream() {
    Stream<String> fruitStream =
        Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
    Flux<String> fruitFlux = Flux.fromStream(fruitStream);
    // ... verify steps
}

这里还是一样地使用 StepVerifier 去验证需要发布到 Flux 的数据。

生成 Flux 数据

有时你不会使用到任何数据,仅仅需要把 Flux 作为一个计数器,输出递增的值。可以使用静态方法 range() 去创建一个 Flux 计数器。图 10.4 展示了 range() 是如何运作的。

图 10.4

![Acrobat_hparWHSeug](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Acrobat_hparWHSeug.png)

下面的测试方法展示了如何创建一个范围的 Flux:

@Test
public void createAFlux_range() {
    Flux<Integer> intervalFlux = Flux.range(1, 5);
    StepVerifier.create(intervalFlux)
        .expectNext(1)
        .expectNext(2)
        .expectNext(3)
        .expectNext(4)
        .expectNext(5)
        .verifyComplete();
}

在这个例子中,Flux 的范围为 1 到 5,StepVerifier 验证了它将会产生五个值,它们是整数的 1 到 5。

另一这类似于 ranger() 的来创建 Flux 的方法时 interval()。如同 range() 方法一样,interval() 创建一个输出递增值的 Flux。不过,inerval() 特别的一点在于,不是传递给它起始值和结束值,而是指定发出数据的时间间隔或频率。图 10.5 展示了 interval() 创建方法的弹珠图:

图 10.5

![Snipaste_2020-03-25_13-50-21](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Snipaste_2020-03-25_13-50-21.png)

例如,可以使用静态的 interval() 方法来创建每秒发送一个值的 Flux,如下所示:

@Test
public void createAFlux_interval() {
    Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
    StepVerifier.create(intervalFlux)
        .expectNext(0L)
        .expectNext(1L)
        .expectNext(2L)
        .expectNext(3L)
        .expectNext(4L)
        .verifyComplete();
}

注意,由 Flux 发出的值是从 0 开始一个一个递增的。另外,由于 interval() 没有给出的最大值,它会永远地运行。同时,也可以使用 take() 操作将结果限制为前 5 个数据。我们将在下一章更多地讨论 take() 操作。

10.3.2 响应式类型结合

你可能发现你需要将两种响应式类型以某种方式合并到一起。或者,在其他情况下,你可能需要将 Flux 分解成多个响应式类型。在本节中,我们将研究 Reactor 中 Flux 和 Mono 的结合和分解操作。

合并响应式类型

假设你有两个 Flux 流,并需要建立一个汇聚结果的 Flux,它会因为能够得到上流的 Flux 流,所以能够产生数据。为了将一个 Flux 与另一个合并,可以使用 mergeWith() 操作,如在图 10.6 展示的弹珠图一样:

图 10.6 合并两个交替发送信息的 Flux 流为一个新的 Flux 流

![Snipaste_2020-03-25_14-31-23](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Snipaste_2020-03-25_14-31-23.png)

例如,假设第一个 Flux 其值是电视和电影人物的名字,第二个 Flux 其值是食品的名称。下面的测试方法将展示如何使用 mergeWith() 方法合并两个 Flux 对象:

@Test
public void mergeFluxes() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500));

    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples")
        .delaySubscription(Duration.ofMillis(250))
        .delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
    StepVerifier.create(mergedFlux)
        .expectNext("Garfield")
        .expectNext("Lasagna")
        .expectNext("Kojak")
        .expectNext("Lollipops")
        .expectNext("Barbossa")
        .expectNext("Apples")
        .verifyComplete();
}

通常情况下,Flux 会尽可能快的快地发送数据。因此,需要在创建 Flux 的时候使用 delayElements() 操作,用来将数据发送速度减慢 —— 每 0.5s 发送一个数据。此外,你将 delaySubscription() 操作应用于 foodFlux,使得它在延迟 250ms 后才会发送数据,因此 foodFlux 将会在 characterFlux 之后执行。

合并这两个 Flux 对象后,新的合并后的 Flux 被创建。当 StepVerifier 订阅合并后的 Flux 时,它会依次订阅两个 Flux 源。

合并后的 Flux 发出的数据的顺序,与源发出的数据的时间顺序一致。由于两个 Flux 都被设置为固定频率发送数据,因此值会通过合并后的 Flux 交替出现 —— character…food…character…food 一直这样下去。如何其中任何一个 Flux 的发送时间被修改了的话,你可能会看到 2 个 charater 跟在 1 个 food 后面或是 2 个 food 跟在 1 个 character 后面的情况。

由于 mergeWith() 不能保证源之间的完美交替,这时可以考虑 zip() 操作。当两个 Flux 被压缩在一起时,结果就是一个新的 Flux 生产出一个元组数据,这个元组包含了来自每一个源 Flux 的数据项。图 10.7 展示出了两个 Flux 是如何被缩在一起的:

图 10.7 将两个 Flux 流压缩为一个 Flux

![Snipaste_2020-03-25_14-31-23](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Snipaste_2020-03-25_14-31-23-1585192471116.png)

为了看看 zip() 操作的执行情况,参考一下下面的测试方法,它把 character Flux 和 food Flux 压缩在了一起:

@Test
public void zipFluxes() {
    Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");

    Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux);

    StepVerifier.create(zippedFlux)
        .expectNextMatches(p ->
        	p.getT1().equals("Garfield") &&
            p.getT2().equals("Lasagna"))
        .expectNextMatches(p ->
            p.getT1().equals("Kojak") &&
            p.getT2().equals("Lollipops"))
        .expectNextMatches(p ->
            p.getT1().equals("Barbossa") &&
            p.getT2().equals("Apples"))
        .verifyComplete();
}

注意,与 mergeWith() 不同的是,zip() 操作是一个静态的创建操作,通过它创建的 Flux 使 character 和 food 完美对齐。从压缩后的 Flux 发送出来的每个项目都是 Tuple2(包含两个对象的容器),其中包含每一个源 Flux 的数据。

如果你不想使用 Tuple2,而是想用一些使用其他类型,你可以提供给 zip() 你想产生任何对象的 Function 接口。

**图 10.8 **

![Snipaste_2020-03-26_11-37-13](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Snipaste_2020-03-26_11-37-13.png)

例如,以下的试验方法说明了如何压缩的 character Flux 和 food Flux,使得它产生 String 类型的的 Flux 对象:

@Test
public void zipFluxesToObject() {
    Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");

    Flux<String> zippedFlux = Flux.zip(characterFlux, foodFlux,
                                   (c, f) -> c + " eats " + f);

    StepVerifier.create(zippedFlux)
        .expectNext("Garfield eats Lasagna")
        .expectNext("Kojak eats Lollipops")
        .expectNext("Barbossa eats Apples")
        .verifyComplete();
}

给 zip() 的 Function 接口(这里给出一个 lambda 表达式)简单地把两个值连接成一句话,由压缩后的 Flux 进行数据发送。

选择第一个响应式类型进行发布

假设你有两个 Flux 对象,你只是想创建一个新的发送从第一个 Flux 产生值的 Flux,而不是将两个 Flux 合并在一起。如图 10.9 所示,first() 操作选择两个 Flux 对象的第一个对象然后输出它的值。

图 10.9 第一操作选择了第一个 Flux 来发送一个消息,此后仅该 Flux 产生消息

![Snipaste_2020-03-26_13-49-36](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Snipaste_2020-03-26_13-49-36.png)

下面的测试方法创建一个 fast Flux 和 slow Flux(这里的 “slow” 的意思是它在订阅之后 100ms 才发布数据)。通过使用 first(),它创建了一个新的 Flux,将只会发布从第一个源 Flux 发布的数据:

@Test
public void firstFlux() {
    Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
        .delaySubscription(Duration.ofMillis(100));
    Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");

    Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);

    StepVerifier.create(firstFlux)
        .expectNext("hare")
        .expectNext("cheetah")
        .expectNext("squirrel")
        .verifyComplete();
}

在这种情况下,因为在 fast Flux 已经开始发布后 100ms,slow Flux 才开始发布数据,这样导致新创建的 Flux 将完全忽略 slow Flux,而只发布 fast flux 中的数据。

10.3.3 转换和过滤响应式流

当数据流过 stream,你可能需要过滤或是修改一些值。在本节中,我们将看到的是转换和过滤流过响应式流中的数据。

从响应式类型中过滤数据

当数据从 Flux 中流出时,过滤数据的最基本方法之一就是简单地忽略前几个条目。如图 10.10 所示,skip() 操作正是这样做的。

图 10.10 skip 操作在将剩余消息传递给结果流之前跳过指定数量的消息

![Snipaste_2020-03-26_14-09-54](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\Snipaste_2020-03-26_14-09-54.png)

给定一个包含多个条目的 Flux,skip() 操作将创建一个新的 Flux,该 Flux 在从源 Flux 发出剩余项之前跳过指定数量的项。下面的测试方法演示如何使用 skip():

@Test
public void skipAFew() {
    Flux<String> skipFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .skip(3);

    StepVerifier.create(skipFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
}

在本例中,有五个字符串项的流。对该流调用 skip(3) 将生成一个新的流,该流跳过前三个项,并且只发布最后两个项。

你也许不是想跳过特定数量的项目,而是需要过一段时间再跳过前几个项目。skip() 操作的另一种形式(如图 10.11 所示)是生成一个流,该流在从源流发出项之前等待一段指定的时间。

图 10.11 skip 操作的另一种形式是在将消息传递到结果流之前等待一段时间

![10.11](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.11.png)

下面的测试方法使用 skip() 创建一个在发出任何值之前等待 4 秒的 Flux。由于该 Flux 是从项之间具有 1 秒延迟(使用 delayElements())的 Flux 创建的,因此只会发出最后两个项:

@Test
public void skipAFewSeconds() {
    Flux<String> skipFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .delayElements(Duration.ofSeconds(1))
        .skip(Duration.ofSeconds(4));

    StepVerifier.create(skipFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
}

你已经看到了 take() 操作的一个例子,但是根据 skip() 操作,take() 可以看作是 skip() 的反面。skip() 跳过前几个项,take() 只发出前几个项(如图 10.12 所示):

@Test
public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .take(3);

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

图 10.12 take 操作只传递来自传入流量的前几个消息,然后取消订阅

![10.12](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.12.png)

与 skip() 一样,take() 也有一个基于持续时间而不是项目计数的可选项。它会在一段时间之后,将接收并发出与通过源 Flux 一样多的项。如图 10.13 所示:

图 10.13 take 操作的另一种形式是在某个时间过去后,将消息传递给结果流

![10.13](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.13.png)

以下测试方法使用 take() 的替代形式在订阅后的前 3.5 秒内发出尽可能多的项:

@Test
public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

skip() 和 take() 操作可以看作是基于计数或持续时间的筛选条件的操作。对于更通用的 Flux 值过滤,会发现 filter() 操作非常有用。

给定一个决定一个项是否通过 Flux 的 Predicate,filter() 操作允许你根据需要的任何条件有选择地发布。图 10.14 中的弹珠图显示了 filter() 的工作原理。

图 10.14 传入的流可以被过滤,以便生成的流只接收与给定谓词匹配的消息。

![10.14](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.14.png)

要查看 filter() 的运行情况,请考虑以下测试方法:

@Test
public void filter() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .filter(np -> !np.contains(" "));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Zion")
        .verifyComplete();
}

这里,filter() 被赋予一个 Predicate,它只接受没有空格的 String。因此,“Grand Canyon” 和 “Grand Teton” 被过滤掉。

也许你需要过滤的是你已经收到的任何项目。distinct() 操作(如图 10.15 所示)产生一个只发布源 Flux 中尚未发布的项的 Flux。

图 10.15 distinct 操作过滤掉所有重复的消息

![10.15](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.15.png)

在下面的测试中,只有唯一的 String 值将从不同的 Flux 中发出:

@Test
public void distinct() {
    Flux<String> animalFlux = Flux.just(
        "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();

    StepVerifier.create(animalFlux)
        .expectNext("dog", "cat", "bird", "anteater")
        .verifyComplete();
}

尽管 “dog” 和 “bird” 分别从源 Flux 中发布两次,但在 distinct Flux 中只发布一次。

映射响应式数据

对于 Flux 或 Mono,最常用的操作之一是将已发布的项转换为其他形式或类型。Reactor 为此提供 map() 和 flatMap() 操作。

map() 操作会创建一个 Flux,该 Flux 在重新发布之前,按照给定函数对其接收的每个对象执行指定的转换。图 10.16 说明了 map() 操作的工作原理。

图 10.16 map 操作在结果流上执行将传入消息转换为新消息

![10.16](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.16.png)

在以下测试方法中,表示篮球运动员的 String 值的 Flux 映射到 Player 对象的新 Flux:

@Test
public void map() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .map(n -> {
            String[] split = n.split("\\s");
            return new Player(split[0], split[1]);
        });

    StepVerifier.create(playerFlux)
        .expectNext(new Player("Michael", "Jordan"))
        .expectNext(new Player("Scottie", "Pippen"))
        .expectNext(new Player("Steve", "Kerr"))
        .verifyComplete();
}

给 map() 的 Function 接口(作为 lambda)将传入 String 以空格进行拆分,并使用生成的字符串数组创建 Player 对象。虽然用 just() 创建的流携带的是 String 对象,但是由 map() 生成的流携带的是 Player 对象。

关于 map() 的重要理解是,映射是同步执行的,因为每个项都是由源 Flux 发布的。如果要异步执行映射,应考虑使用 flatMap() 操作。

flatMap() 操作需要一些思考和实践才能变得很熟练。如图 10.17 所示,flatMap() 不是简单地将一个对象映射到另一个对象,而是将每个对象映射到一个新的 Mono 或 Flux。Mono 或 Flux 的结果被压成一个新的 Flux。当与 subscribeOn() 一起使用时,flatMap() 可以释放 Reactor 类型的异步能力。

图 10.17 转换映射操作使用中间流来执行转换,从而允许异步转换

![10.17](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.17.png)

下面的测试方法展示了 flatMap() 和 subscribeOn() 的用法:

@Test
public void flatMap() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .flatMap(n -> Mono.just(n).map(p -> {
            String[] split = p.split("\\s");
            return new Player(split[0], split[1]);
        })
        .subscribeOn(Schedulers.parallel())
        );

    List<Player> playerList = Arrays.asList(
        new Player("Michael", "Jordan"),
        new Player("Scottie", "Pippen"Pippen"),
        new Player("Steve", "Kerr"));

    StepVerifier.create(playerFlux)
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .verifyComplete();
}

请注意,flatMap() 被赋予一个 lambda 函数,该函数将传入 String 转换为 String 类型的 Mono。然后对 Mono 应用 map() 操作,将 String 转换为 Player。

如果你停在那里,产生的 Flux 将携带 Player 对象,以与 map() 示例相同的顺序同步生成。但是对 Mono 做的最后一件事是调用 subscribeOn() 来指示每个订阅应该在一个并行线程中进行。因此,可以异步和并行地执行多个传入 String 对象的映射操作。

尽管 subscribeOn() 的名称与 subscribe() 类似,但它们却截然不同。subscribe() 是一个动词,它订阅一个响应式流并有效地将其启动,而 subscribeOn() 则更具描述性,它指定了应该 如何 并发地处理订阅。Reactor 不强制任何特定的并发模型;通过 subscribeOn() 可以使用 Schedulers 程序中的一个静态方法指定要使用的并发模型。在本例中,使用了 parallel(),它是使用固定大小线程池的工作线程(大小与 CPU 内核的数量一样)。但是调度程序支持多个并发模型,如表 10.1 所述。

表 10.1 Schedulers 并发模型

Schedulers 方法 描述
.immediate() 在当前线程中执行订阅
.single() 在单个可重用线程中执行订阅,对所有调用方重复使用同一线程
.newSingle() 在每个调用专用线程中执行订阅
.elastic() 在从无限弹性池中提取的工作进程中执行订阅,根据需要创建新的工作线程,并释放空闲的工作线程(默认情况下 60 秒)
.parallel() 在从固定大小的池中提取的工作进程中执行订阅,该池的大小取决于 CPU 核心的数量。

使用 flatMap() 和 subscribeOn() 的好处是,可以通过将工作分成多个并行线程来增加流的吞吐量。但由于这项工作是并行完成的,无法保证先完成哪项工作,因此无法知道产生的 Flux 中排放的项目的顺序。因此,StepVerifier 只能验证发出的每个项是否存在于 Player 对象的预期列表中,并且在 Flux 完成之前将有三个这样的项。

在响应式流上缓冲数据

在处理流经 Flux 的数据的过程中,你可能会发现将数据流分解成比特大小的块是有帮助的。buffer() 操作(如图 10.18 所示)可以解决这个问题。

图 10.18 缓冲区操作会产生一个给定最大大小的列表流,这些列表是从传入的流中收集的

![10.18](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.18.png)

给定一个 String 值的 Flux,每个值都包含一个水果的名称,你可以创建一个新的 List 集合的 Flux,其中每个 List 的元素数不超过指定的数目:

@Test
public void buffer() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

    StepVerifier
        .create(bufferedFlux)
        .expectNext(Arrays.asList("apple", "orange", "banana"))
        .expectNext(Arrays.asList("kiwi", "strawberry"))
        .verifyComplete();
}

在这种情况下,字符串元素的流量被缓冲到一个列表集合的新流量中,每个列表集合包含的项不超过三个。因此,发出五个字符串值的原始磁通量将转换为发出两个列表集合的磁通量,一个包含三个水果,另一个包含两个水果。

那又怎么样?将值从反应性流量缓冲到非反应性列表集合似乎适得其反。但是,当将 buffer()与 flatMap()结合使用时,它可以并行处理每个列表集合:

Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
    .buffer(3)
    .flatMap(x ->
         Flux.fromIterable(x)
             .map(y -> y.toUpperCase())
             .subscribeOn(Schedulers.parallel())
             .log()
    ).subscribe();

在这个新示例中,仍然将五个 String 值的 Flux 缓冲到 List 集合的新 Flux 中,然后将 flatMap() 应用于 List 集合的 Flux。这将获取每个 List 缓冲区并从其元素创建一个新的 Flux,然后对其应用 map() 操作。因此,每个缓冲 List 在单独的线程中进一步并行处理。

为了证明它是有效的,我还包含了一个要应用于每个子 Flux 的 log() 操作。log() 操作只记录所有的 Reactor Streams 事件,这样你就可以看到真正发生了什么。因此,以下条目将写入日志(为了简洁起见,删除了时间组件):

[main] INFO reactor.Flux.SubscribeOn.1 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

日志条目清楚地显示,第一个缓冲区(apple、orange 和 banana)中的水果在 parallel-1 线程中处理。同时,在第二个缓冲区(kiwi 和 strawberry)中的水果在 parallel-2 线程中进行处理。从每个缓冲区的日志条目交织在一起这一事实可以明显看出,这两个缓冲区是并行处理的。

如果出于某种原因,需要将 Flux 发出的所有内容收集到 List 中,则可以调用不带参数的 buffer():

Flux<List<List>> bufferedFlux = fruitFlux.buffer();

这将产生一个新的 Flux,该 Flux 会发出一个包含源 Flux 发布的所有项的 List。使用 collectList() 操作也可以实现同样的功能,如图 10.19 中的弹珠图所示:

图 10.19 collect-list 操作产生一个 Mono,其中包含由传入 Flux 发出的所有消息的列表

![10.19](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.19.png)

collectList() 生成一个发布 List 的 Mono,而不是生成一个发布 List 的 Mono。以下测试方法说明了如何使用它:

@Test
public void collectList() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");
    Mono<List<String>> fruitListMono = fruitFlux.collectList();

    StepVerifier
        .create(fruitListMono)
        .expectNext(Arrays.asList(
            "apple", "orange", "banana", "kiwi", "strawberry"))
        .verifyComplete();
}

一种更有趣的收集 Flux 发送的项目的方法是把它们存到 Map 中。如图 10.20 所示,collectMap() 操作产生一个 Mono,它发布一个 Map,其中填充了由给定 Function 计算其键值的条目。

图 10.20 collect-map 操作产生一个 Mono,其中包含由传入 Flux 发出的消息的 Map,其中的键来自传入消息的某些特性

![10.20](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.20.png)

要查看 collectMap() 的实际操作,请查看以下测试方法:

@Test
public void collectMap() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
    Mono<Map<Character, String>> animalMapMono =
        animalFlux.collectMap(a -> a.charAt(0));

    StepVerifier
        .create(animalMapMono)
        .expectNextMatches(map -> {
            return
                map.size() == 3 &&
                map.get('a').equals("aardvark") &&
                map.get('e').equals("eagle") &&
                map.get('k').equals("kangaroo");
        })
        .verifyComplete();
}

源 Flux 发出了一些动物的名字。在该 Flux 中,可以使用 collectMap() 创建一个新的 Mono,该 Mono 发送一个 Map,其中的键值由动物名称的第一个字母确定,并且该值是动物名称本身。如果两个动物名以同一个字母开头(如 elephanteaglekoalakangaroo),则流经流的最后一个条目将覆盖所有先前的条目。

10.3.4 对反应类型执行逻辑操作

有时你只需要知道 Mono 或 Flux 发布的条目是否符合某些条件。all() 和 any() 操作将执行这样的逻辑。图 10.21 和 10.22 说明了 all() 和 any() 是如何工作的:

图 10.21 可以对 Flux 进行测试以确保所有消息在所有操作中都满足某些条件

![10.21](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.21.png)

图 10.22 可以对 Flux 进行测试以确保在任何操作中至少有一条消息满足某些条件

![10.22](E:\Document\spring-in-action-v5-translate\第三部分 响应式 Spring\第 10 章 Reactor 介绍\10.22.png)

假设你想知道由 Flux 发布的每个 String 都包含字母 a 或字母 k。下面的测试演示如何使用 all() 检查该条件:

@Test
public void all() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
    StepVerifier.create(hasAMono)
        .expectNext(true)
        .verifyComplete();

    Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
    StepVerifier.create(hasKMono)
        .expectNext(false)
        .verifyComplete();
}

在第一个 StepVerifier 中,检查字母 a。all 操作应用于源 Flux,从而生成 Boolean 类型的 Mono。在本例中,所有的动物名都包含字母 a,因此从产生的 Mono 发出 true。但是在第二个 StepVerifier 中,得到的 Mono 将发出 false,因为并非所有的动物名都包含 k。

与其执行全部满足或完全不满足的检查,不如满足至少有一个条目匹配。在这种情况下,any() 操作就是你所需要的。这个新的测试用例使用 any() 检查字母 tz

@Test
public void any() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
    StepVerifier.create(hasAMono)
        .expectNext(true)
        .verifyComplete();

    Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
    StepVerifier.create(hasZMono)
        .expectNext(false)
        .verifyComplete();
}

在第一个 StepVerifier 中,你会看到生成的 Mono 发出 true,因为至少有一个动物名有字母 t(特别是 elephant)。在第二个 StepVerifier 中,生成的 Mono 发出 false,因为没有一个动物名包含 z

上一页
下一页