结合操作
Combing(组合)
Merge(合并)
在”异步的世界“中经常会创建这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。RxJava 的 merge()方法将帮助你把两个甚至更多的 Observables 合并到他们发射的数据项里。下图给出了把两个序列合并在一个最终发射的 Observable。 正如你看到的那样,发射的数据被交叉合并到一个 Observable 里面。注意如果你同步的合并 Observable,它们将连接在一起并且不会交叉。
Observable<Integer> observable_1 = Observable.from(new Integer[]{1, 2});
Observable<Integer> observable_2 = Observable.from(new Integer[]{2, 3});
Observable<Integer> observable_combined = Observable.merge(observable_1, observable_2);
observable_combined.subscribe(
(value) -> {
System.out.println(Thread.currentThread().getName() + " Emited!");
System.out.println(value);
}
);
需要注意的是,在上述代码中,最终值的输出序列还是 1,2,2,3,这是因为两个 Observable 都是在 Main Thread 中执行,我们来看看如果用subscribeOn
让每个 Observable 在不同线程中执行的效果:
Observable<Object> observable_1 = Observable.create(subscriber -> {
try {
Thread.sleep(1000l);
subscriber.onNext(1);
Thread.sleep(3000l);
subscriber.onNext(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).subscribeOn(Schedulers.newThread());
Observable<Object> observable_2 = Observable.create(subscriber -> {
try {
Thread.sleep(2000l);
subscriber.onNext(3);
Thread.sleep(4000l);
subscriber.onNext(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.subscribeOn(Schedulers.newThread());
Observable<Object> observable_combined = Observable.merge(observable_1, observable_2);
observable_combined.subscribe(
(value) -> {
System.out.println(Thread.currentThread().getName() + " Emited!");
System.out.println(value);
}
);
最终的结果如下所示:
RxNewThreadScheduler-1 Emited!
1
RxNewThreadScheduler-2 Emited!
3
RxNewThreadScheduler-1 Emited!
2
RxNewThreadScheduler-2 Emited!
4