辅助操作

Utility

delay:延迟发射或者监听

顾名思义,delay 操作会延时一段时间再发射数据。有两种方式实现这个效果;一是缓存这些数据,等一段时间后再发射;或者是把 Subscriber 订阅的时间延迟。

delay()

简单的 delay 函数只是把每个数据都延时一段时间再发射,相当于把整个数据流都往后推迟了。

Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
            .delay(1, TimeUnit.SECONDS)
            .timeInterval()
            .subscribe(System.out::println);

输出:

TimeInterval [intervalInMilliseconds=1109, value=0]
TimeInterval [intervalInMilliseconds=94, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]
TimeInterval [intervalInMilliseconds=100, value=3]
TimeInterval [intervalInMilliseconds=101, value=4]

可以看到,第一个数据差不多被延迟了 1s,后面每隔 100ms 左右发射下一个数据。还可以分别推迟每个数据的时间。

这个重载函数的参数为一个函数,该函数的参数为源 Observable 发射的数据返回一个 信号 Observable。当信号 Observable 发射数据的时候,也就是源 Observable 的数据发射的时候。

Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
    .delay(i -> Observable.timer(i * 100, TimeUnit.MILLISECONDS))
    .timeInterval()
    .subscribe(System.out::println);

输出:

TimeInterval [intervalInMilliseconds=152, value=0]
TimeInterval [intervalInMilliseconds=173, value=1]
TimeInterval [intervalInMilliseconds=199, value=2]
TimeInterval [intervalInMilliseconds=201, value=3]
TimeInterval [intervalInMilliseconds=199, value=4]

源 Observable 每隔 100ms 发射一个数据,而结果显示为 200ms 发射一个数据。interval 从 0 开始发射数据,i 结果为 0、1、2 等,每隔数据推迟了 i*100ms 再发射。所以后面每隔数据都比前一个数据多推迟了 100ms,结果就是每个数据差不多间隔 200ms 发射。

delaySubscription

除了缓存数据,延迟发射缓冲的数据以外,还可以选择使用推迟订阅的方式。根据 Observable 是 hot 或者 cold 则会有不同的结果。后面会专门的介绍 cold 和 hot Observable 的区别。这里的示例为 cold Observable,推迟订阅到 cold Observable 和推迟整个数据流是一样的效果。但是由于推迟订阅不需要缓存发射的数据,所以更加高效。

Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
    .delaySubscription(1000, TimeUnit.MILLISECONDS)
    .timeInterval()
    .subscribe(System.out::println);

输出:

TimeInterval [intervalInMilliseconds=1114, value=0]
TimeInterval [intervalInMilliseconds=92, value=1]
TimeInterval [intervalInMilliseconds=101, value=2]
TimeInterval [intervalInMilliseconds=100, value=3]
TimeInterval [intervalInMilliseconds=99, value=4]

可以看到整个数据流推迟了 1000ms。同样还有一个重载函数,可以使用另外一个 Observable 来告诉 Subscriber 何时订阅:

public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>> subscriptionDelay)
上一页
下一页