结合操作

Combing(组合)

Merge(合并)

在”异步的世界“中经常会创建这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。RxJava 的 merge()方法将帮助你把两个甚至更多的 Observables 合并到他们发射的数据项里。下图给出了把两个序列合并在一个最终发射的 Observable。

正如你看到的那样,发射的数据被交叉合并到一个 Observable 里面。注意如果你同步的合并 Observable,它们将连接在一起并且不会交叉。

Observable<Integer> observable_1 = Observable.from(new Integer[]{1, 2});

Observable<Integer> observable_2 = Observable.from(new Integer[]{2, 3});

Observable<Integer> observable_combined = Observable.merge(observable_1, observable_2);

observable_combined.subscribe(
        (value) -> {

            System.out.println(Thread.currentThread().getName() + " Emited!");
            System.out.println(value);
        }
);

需要注意的是,在上述代码中,最终值的输出序列还是 1,2,2,3,这是因为两个 Observable 都是在 Main Thread 中执行,我们来看看如果用subscribeOn让每个 Observable 在不同线程中执行的效果:

Observable<Object> observable_1 = Observable.create(subscriber -> {
    try {
        Thread.sleep(1000l);

        subscriber.onNext(1);

        Thread.sleep(3000l);

        subscriber.onNext(2);

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).subscribeOn(Schedulers.newThread());

Observable<Object> observable_2 = Observable.create(subscriber -> {
    try {

        Thread.sleep(2000l);

        subscriber.onNext(3);

        Thread.sleep(4000l);

        subscriber.onNext(4);

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
})
        .subscribeOn(Schedulers.newThread());

Observable<Object> observable_combined = Observable.merge(observable_1, observable_2);

observable_combined.subscribe(
        (value) -> {

            System.out.println(Thread.currentThread().getName() + " Emited!");

            System.out.println(value);
        }
);

最终的结果如下所示:

RxNewThreadScheduler-1 Emited!
1
RxNewThreadScheduler-2 Emited!
3
RxNewThreadScheduler-1 Emited!
2
RxNewThreadScheduler-2 Emited!
4
上一页
下一页