Flowable

Flowable

在以前的 RxJava 版本中,只有一个基类可以处理可感知背压的源和不可感知背压的源 Observable。RxJava 2 在这两种源之间引入了明显的区别-背压感知源现在使用专用类 Flowable 表示。Observable 不支持背压,因此,我们应该将其用于仅消耗而无法影响的资源。

创建与控制

// Simple Flowable
Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);

// Flowable from Observable
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
  .toFlowable(BackpressureStrategy.BUFFER);

// from FlowableOnSubscribe
FlowableOnSubscribe<Integer> flowableOnSubscribe
 = flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
  .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

BackpressureStrategy

诸如 toFlowable()或 create() 之类的某些方法采用 BackpressureStrategy 作为参数。BackpressureStrategy 是一个枚举,它定义了我们将应用于 Flowable 的背压行为。它可以缓存或丢弃事件,或者根本不执行任何行为,在最后一种情况下,我们将负责使用反压运算符定义它。BackpressureStrategy 与 RxJava 先前版本中存在的 BackpressureMode 相似。

RxJava 2 提供了五种不同的策略。

Buffer

如果我们使用 BackpressureStrategy.BUFFER,则源将缓冲所有事件,直到订阅者可以使用它们:

public void thenAllValuesAreBufferedAndReceived() {
    List testList = IntStream.range(0, 100000)
      .boxed()
      .collect(Collectors.toList());

    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.BUFFER)
      .observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();

    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertEquals(testList, receivedInts);
}

这类似于在 Flowable 上调用 onBackpressureBuffer() 方法,但是它不允许显式定义缓冲区大小或 onOverflow 操作。

Drop

我们可以使用 BackpressureStrategy.DROP 丢弃无法消耗的事件,而不是对其进行缓冲。同样,这类似于在 Flowable 上使用 onBackpressureDrop():

public void whenDropStrategyUsed_thenOnBackpressureDropped() {

    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.DROP)
      .observeOn(Schedulers.computation())
      .test();
    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(!receivedInts.contains(100000));
 }

Latest

使用 BackpressureStrategy.LATEST 将强制源仅保留最新事件,从而在使用者无法跟上时覆盖任何先前的值:

public void whenLatestStrategyUsed_thenTheLastElementReceived() {

    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.LATEST)
      .observeOn(Schedulers.computation())
      .test();

    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(receivedInts.contains(100000));
 }

当我们查看代码时,BackpressureStrategy.LATEST 和 BackpressureStrategy.DROP 看起来非常相似。但是,BackpressureStrategy.LATEST 将覆盖我们的 Subscriber 无法处理的元素,并仅保留最新的元素。另一方面,BackpressureStrategy.DROP 将丢弃无法处理的元素,这意味着不一定会发出最新元素。

Error

当我们使用 BackpressureStrategy.ERROR 时,我们只是在说我们不希望发生背压。因此,如果使用者跟不上源,就应该抛出 MissingBackpressureException:

public void whenErrorStrategyUsed_thenExceptionIsThrown() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.ERROR)
      .observeOn(Schedulers.computation())
      .test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

Missing

如果使用 BackpressureStrategy.MISSING,则源将推送元素而不会丢弃或缓冲。在这种情况下,下游将必须处理溢出:

public void whenMissingStrategyUsed_thenException() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.MISSING)
      .observeOn(Schedulers.computation())
      .test();
    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

在我们的测试中,我们同时针对 ERROR 和 MISSING 策略都缺少 MissingbackpressureException。因为当源的内部缓冲区溢出时,它们都将引发此类异常。但是,值得注意的是,两者都有不同的目的。当我们完全不期望背压时,应该使用前一种,并且我们希望源抛出异常以防万一。如果我们不想在创建 Flowable 时指定默认行为,则可以使用后一种方法。

上一页
下一页