结合操作
Combing(组合)
Merge(合并)
在”异步的世界“中经常会创建这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。RxJava 的 merge()方法将帮助你把两个甚至更多的 Observables 合并到他们发射的数据项里。下图给出了把两个序列合并在一个最终发射的 Observable。
  
      
Observable<Integer> observable_1 = Observable.from(new Integer[]{1, 2});
Observable<Integer> observable_2 = Observable.from(new Integer[]{2, 3});
Observable<Integer> observable_combined = Observable.merge(observable_1, observable_2);
observable_combined.subscribe(
        (value) -> {
            System.out.println(Thread.currentThread().getName() + " Emited!");
            System.out.println(value);
        }
);
需要注意的是,在上述代码中,最终值的输出序列还是 1,2,2,3,这是因为两个 Observable 都是在 Main Thread 中执行,我们来看看如果用subscribeOn让每个 Observable 在不同线程中执行的效果:
Observable<Object> observable_1 = Observable.create(subscriber -> {
    try {
        Thread.sleep(1000l);
        subscriber.onNext(1);
        Thread.sleep(3000l);
        subscriber.onNext(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).subscribeOn(Schedulers.newThread());
Observable<Object> observable_2 = Observable.create(subscriber -> {
    try {
        Thread.sleep(2000l);
        subscriber.onNext(3);
        Thread.sleep(4000l);
        subscriber.onNext(4);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
})
        .subscribeOn(Schedulers.newThread());
Observable<Object> observable_combined = Observable.merge(observable_1, observable_2);
observable_combined.subscribe(
        (value) -> {
            System.out.println(Thread.currentThread().getName() + " Emited!");
            System.out.println(value);
        }
);
最终的结果如下所示:
RxNewThreadScheduler-1 Emited!
1
RxNewThreadScheduler-2 Emited!
3
RxNewThreadScheduler-1 Emited!
2
RxNewThreadScheduler-2 Emited!
4
