变换操作
Transforming( 转换)
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
Map/flatMap
map
为一对一变换。可以将一个对象转换成另一个对象,或者将对象数组的每单个对象转换成新的对象数组的每单个对象。flatMap()
为一对多变换。可以将一个对象转换成一组对象,或者将对象数组的每单个对象转换成新的对象数组的每单组对象。flatMap()
中返回的是个Observable
对象,并且这个Observable
对象并不是被直接发送到了Subscriber
的回调方法中。
flatMap()
的原理是这样的:
- 使用传入的事件对象创建一个
Observable
对象; - 并不发送这个
Observable
, 而是将它激活,于是它开始发送事件; - 每一个创建出来的
Observable
发送的事件,都被汇入同一个Observable
,而这个Observable
负责将这些事件统一交给Subscriber
的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable
将初始的对象铺平之后通过统一路径分发了下去。而这个铺平就是 flatMap()
所谓的
以map()
可以将flatMap()
可以将
public class MapObservable {
public static String[] names = new String[] { "A", "B", "C", "D" };
public static void mapNameToId() {
Observable
.from(names)
.map(
s -> {
return s.hashCode();
}
)
.subscribe(
integer -> {
System.out.println("Person Id is :" + integer);
}
);
}
public static void mapNameToEmails() {
Observable
.from(names)
.flatMap(
s -> {
return Observable.from(new String[] { s + "@b.com", s + "@a.com" });
}
)
.subscribe(
s -> {
System.out.println("Person Id is :" + s);
}
);
}
public static void main(String args[]) {
MapObservable.mapNameToId();
MapObservable.mapNameToEmails();
}
}
Exception Handler
当我们利用
public static void flatMapWithException() {
Observable.from(names).
flatMap(s -> {
return Observable.<String>create(subscriber -> {
subscriber.onError(new Exception("Custom Exceptions"));
});
})
.flatMap(s -> {
System.out.println("In FlatMap 3");
return Observable.from(new String[]{s + "@b.com", s + "@a.com"});
})
.subscribe(s -> {
System.out.println("Person Id is :" + s);
},
throwable -> {
System.out.println(throwable.getMessage());
});
}
最终的输出结果直接就是
Custom Exceptions
可以看出,一旦某个
Multiple Threads
/**
* @function 演示在不同线程中的flatMap
*/
public static void flatMapWithMultipleThread() {
Observable.from(new String[]{"name"}).
flatMap(s -> {
return Observable.<String>create(subscriber -> {
System.out.println("FlatMap 1:" + Thread.currentThread().getName());
subscriber.onNext(s);
});
})
.flatMap(s -> {
return Observable.<String>create(subscriber -> {
try {
System.out.println("FlatMap 2:" + Thread.currentThread().getName());
Thread.sleep(1000l);
subscriber.onNext(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
})
.subscribe(s -> {
System.out.println("Subscriber:" + Thread.currentThread().getName());
},
throwable -> {
System.out.println(throwable.getMessage());
});
}
public static void main(String args[]) throws InterruptedException {
MapObservable.flatMapWithMultipleThread();
System.out.println("Before Stop");
//睡眠一段时间以等待所有的输出
Thread.sleep(5000l);
}
最终输出结果为:
FlatMap 1:main
FlatMap 2:main
Subscriber:main