变换操作

Transforming(转换)

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

Map/flatMap

Map 与 flatMap 的用法相近,都可以将某个 Observable 转化为另一个 Observable,不过它们的区别如下:

  • map为一对一变换。可以将一个对象转换成另一个对象,或者将对象数组的每单个对象转换成新的对象数组的每单个对象。
  • flatMap()为一对多变换。可以将一个对象转换成一组对象,或者将对象数组的每单个对象转换成新的对象数组的每单组对象。flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

flatMap() 的原理是这样的:

  1. 使用传入的事件对象创建一个 Observable 对象;
  2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象铺平之后通过统一路径分发了下去。而这个铺平就是 flatMap() 所谓的 flat。

以 Person 为例,一个 Person 对应一个身份证 id,一个 Person 可以有多个 Email。通过map()可以将 Person 转换成 id,从而得到一个 Person 的身份证号码;通过flatMap()可以将 Person 转换成一组 Email,从而得到一个 Person 的所有 Email。

public class MapObservable {
  public static String[] names = new String[] { "A", "B", "C", "D" };

  public static void mapNameToId() {
    Observable
      .from(names)
      .map(
        s -> {
          return s.hashCode();
        }
      )
      .subscribe(
        integer -> {
          System.out.println("Person Id is :" + integer);
        }
      );
  }

  public static void mapNameToEmails() {
    Observable
      .from(names)
      .flatMap(
        s -> {
          return Observable.from(new String[] { s + "@b.com", s + "@a.com" });
        }
      )
      .subscribe(
        s -> {
          System.out.println("Person Id is :" + s);
        }
      );
  }

  public static void main(String args[]) {
    MapObservable.mapNameToId();

    MapObservable.mapNameToEmails();
  }
}

Exception Handler

当我们利用 flatMap 来链式调用 Observable 时,可能出现在中间的某个 Observable 抛出异常的情形,本例即是演示这种可能存在异常的情形,如下:


public static void flatMapWithException() {
    Observable.from(names).
            flatMap(s -> {
                return Observable.<String>create(subscriber -> {
                    subscriber.onError(new Exception("Custom Exceptions"));
                });
            })
            .flatMap(s -> {
                System.out.println("In FlatMap 3");
                return Observable.from(new String[]{s + "@b.com", s + "@a.com"});
            })
            .subscribe(s -> {
                        System.out.println("Person Id is :" + s);
                    },
                    throwable -> {
                        System.out.println(throwable.getMessage());
                    });
}

最终的输出结果直接就是:

Custom Exceptions

可以看出,一旦某个 Observable 抛出异常,那么会直接进入最后的 Subscriber。

Multiple Threads

RxJava 本身最大的优势即是可以进行便捷明晰的并发编程,当我们在连接多个 Observable 时候,可以选择将某些耗时较长的 Observable 放到子线程中运行,测试代码如下:


/**
 * @function 演示在不同线程中的flatMap
 */
public static void flatMapWithMultipleThread() {
    Observable.from(new String[]{"name"}).
            flatMap(s -> {
                return Observable.<String>create(subscriber -> {
                    System.out.println("FlatMap 1:" + Thread.currentThread().getName());
                    subscriber.onNext(s);
                });
            })
            .flatMap(s -> {
                return Observable.<String>create(subscriber -> {
                    try {
                        System.out.println("FlatMap 2:" + Thread.currentThread().getName());
                        Thread.sleep(1000l);
                        subscriber.onNext(s);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            })
            .subscribe(s -> {
                        System.out.println("Subscriber:" + Thread.currentThread().getName());
                    },
                    throwable -> {
                        System.out.println(throwable.getMessage());
                    });
}

public static void main(String args[]) throws InterruptedException {

    MapObservable.flatMapWithMultipleThread();

    System.out.println("Before Stop");

    //睡眠一段时间以等待所有的输出
    Thread.sleep(5000l);

}

最终输出结果为:


FlatMap 1:main
FlatMap 2:main
Subscriber:main

buffer/window:时间平移

下一页