变换操作
Transforming(转换)
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
Map/flatMap
Map 与 flatMap 的用法相近,都可以将某个 Observable 转化为另一个 Observable,不过它们的区别如下:
map
为一对一变换。可以将一个对象转换成另一个对象,或者将对象数组的每单个对象转换成新的对象数组的每单个对象。flatMap()
为一对多变换。可以将一个对象转换成一组对象,或者将对象数组的每单个对象转换成新的对象数组的每单组对象。flatMap()
中返回的是个Observable
对象,并且这个Observable
对象并不是被直接发送到了Subscriber
的回调方法中。
flatMap()
的原理是这样的:
- 使用传入的事件对象创建一个
Observable
对象; - 并不发送这个
Observable
, 而是将它激活,于是它开始发送事件; - 每一个创建出来的
Observable
发送的事件,都被汇入同一个Observable
,而这个Observable
负责将这些事件统一交给Subscriber
的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable
将初始的对象铺平之后通过统一路径分发了下去。而这个铺平就是 flatMap()
所谓的 flat。
以 Person 为例,一个 Person 对应一个身份证 id,一个 Person 可以有多个 Email。通过map()
可以将 Person 转换成 id,从而得到一个 Person 的身份证号码;通过flatMap()
可以将 Person 转换成一组 Email,从而得到一个 Person 的所有 Email。
public class MapObservable {
public static String[] names = new String[] { "A", "B", "C", "D" };
public static void mapNameToId() {
Observable
.from(names)
.map(
s -> {
return s.hashCode();
}
)
.subscribe(
integer -> {
System.out.println("Person Id is :" + integer);
}
);
}
public static void mapNameToEmails() {
Observable
.from(names)
.flatMap(
s -> {
return Observable.from(new String[] { s + "@b.com", s + "@a.com" });
}
)
.subscribe(
s -> {
System.out.println("Person Id is :" + s);
}
);
}
public static void main(String args[]) {
MapObservable.mapNameToId();
MapObservable.mapNameToEmails();
}
}
Exception Handler
当我们利用 flatMap 来链式调用 Observable 时,可能出现在中间的某个 Observable 抛出异常的情形,本例即是演示这种可能存在异常的情形,如下:
public static void flatMapWithException() {
Observable.from(names).
flatMap(s -> {
return Observable.<String>create(subscriber -> {
subscriber.onError(new Exception("Custom Exceptions"));
});
})
.flatMap(s -> {
System.out.println("In FlatMap 3");
return Observable.from(new String[]{s + "@b.com", s + "@a.com"});
})
.subscribe(s -> {
System.out.println("Person Id is :" + s);
},
throwable -> {
System.out.println(throwable.getMessage());
});
}
最终的输出结果直接就是:
Custom Exceptions
可以看出,一旦某个 Observable 抛出异常,那么会直接进入最后的 Subscriber。
Multiple Threads
RxJava 本身最大的优势即是可以进行便捷明晰的并发编程,当我们在连接多个 Observable 时候,可以选择将某些耗时较长的 Observable 放到子线程中运行,测试代码如下:
/**
* @function 演示在不同线程中的flatMap
*/
public static void flatMapWithMultipleThread() {
Observable.from(new String[]{"name"}).
flatMap(s -> {
return Observable.<String>create(subscriber -> {
System.out.println("FlatMap 1:" + Thread.currentThread().getName());
subscriber.onNext(s);
});
})
.flatMap(s -> {
return Observable.<String>create(subscriber -> {
try {
System.out.println("FlatMap 2:" + Thread.currentThread().getName());
Thread.sleep(1000l);
subscriber.onNext(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
})
.subscribe(s -> {
System.out.println("Subscriber:" + Thread.currentThread().getName());
},
throwable -> {
System.out.println(throwable.getMessage());
});
}
public static void main(String args[]) throws InterruptedException {
MapObservable.flatMapWithMultipleThread();
System.out.println("Before Stop");
//睡眠一段时间以等待所有的输出
Thread.sleep(5000l);
}
最终输出结果为:
FlatMap 1:main
FlatMap 2:main
Subscriber:main