变换操作

Transforming(转换)

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

Map/flatMap

MapflatMap的用法相近,都可以将某个Observable转化为另一个Observable,不过它们的区别如下:

  • map为一对一变换。可以将一个对象转换成另一个对象,或者将对象数组的每单个对象转换成新的对象数组的每单个对象。
  • flatMap()为一对多变换。可以将一个对象转换成一组对象,或者将对象数组的每单个对象转换成新的对象数组的每单组对象。flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

flatMap() 的原理是这样的:

  1. 使用传入的事件对象创建一个 Observable 对象;
  2. 并不发送这个Observable,而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象铺平之后通过统一路径分发了下去。而这个铺平就是 flatMap() 所谓的flat

Person为例,一个Person对应一个身份证id,一个Person可以有多个Email。通过map()可以将Person转换成id,从而得到一个Person的身份证号码;通过flatMap()可以将Person转换成一组Email,从而得到一个Person的所有Email

public class MapObservable {
  public static String[] names = new String[] { "A", "B", "C", "D" };

  public static void mapNameToId() {
    Observable
      .from(names)
      .map(
        s -> {
          return s.hashCode();
        }
      )
      .subscribe(
        integer -> {
          System.out.println("Person Id is :" + integer);
        }
      );
  }

  public static void mapNameToEmails() {
    Observable
      .from(names)
      .flatMap(
        s -> {
          return Observable.from(new String[] { s + "@b.com", s + "@a.com" });
        }
      )
      .subscribe(
        s -> {
          System.out.println("Person Id is :" + s);
        }
      );
  }

  public static void main(String args[]) {
    MapObservable.mapNameToId();

    MapObservable.mapNameToEmails();
  }
}

Exception Handler

当我们利用flatMap来链式调用Observable时,可能出现在中间的某个Observable抛出异常的情形,本例即是演示这种可能存在异常的情形,如下:


public static void flatMapWithException() {
    Observable.from(names).
            flatMap(s -> {
                return Observable.<String>create(subscriber -> {
                    subscriber.onError(new Exception("Custom Exceptions"));
                });
            })
            .flatMap(s -> {
                System.out.println("In FlatMap 3");
                return Observable.from(new String[]{s + "@b.com", s + "@a.com"});
            })
            .subscribe(s -> {
                        System.out.println("Person Id is :" + s);
                    },
                    throwable -> {
                        System.out.println(throwable.getMessage());
                    });
}

最终的输出结果直接就是:

Custom Exceptions

可以看出,一旦某个Observable抛出异常,那么会直接进入最后的Subscriber

Multiple Threads

RxJava本身最大的优势即是可以进行便捷明晰的并发编程,当我们在连接多个Observable时候,可以选择将某些耗时较长的Observable放到子线程中运行,测试代码如下:


/**
 * @function 演示在不同线程中的flatMap
 */
public static void flatMapWithMultipleThread() {
    Observable.from(new String[]{"name"}).
            flatMap(s -> {
                return Observable.<String>create(subscriber -> {
                    System.out.println("FlatMap 1:" + Thread.currentThread().getName());
                    subscriber.onNext(s);
                });
            })
            .flatMap(s -> {
                return Observable.<String>create(subscriber -> {
                    try {
                        System.out.println("FlatMap 2:" + Thread.currentThread().getName());
                        Thread.sleep(1000l);
                        subscriber.onNext(s);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            })
            .subscribe(s -> {
                        System.out.println("Subscriber:" + Thread.currentThread().getName());
                    },
                    throwable -> {
                        System.out.println(throwable.getMessage());
                    });
}

public static void main(String args[]) throws InterruptedException {

    MapObservable.flatMapWithMultipleThread();

    System.out.println("Before Stop");

    //睡眠一段时间以等待所有的输出
    Thread.sleep(5000l);

}

最终输出结果为:


FlatMap 1:main
FlatMap 2:main
Subscriber:main

buffer/window:时间平移

下一页