原理浅析

RxJava 原理与机制浅析

ReactiveX 框架(基于 RxJava)实现原理浅析

RxJava 基本流程和 lift 源码分析

RxJava 原理解析

Subscribe

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到,subscriber() 做了 3 件事:

  1. 调用 Subscriber.onStart()。这个方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber)。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

整个过程中对象间的关系如下图:

关系静图

或者可以看动图:

关系静图

除了 subscribe(Observer)subscribe(Subscriber)subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自动创建 Subscriber,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber,并使用 onNextAction、onErrorAction 和 onCompletedAction 来定义 onNext()、onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1Action0Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的闭包。Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj)onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj)onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,ObserverSubscriber 具有相同的角色,而且 Observersubscribe() 过程中最终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer,这样更加严谨。

Transform

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——

  • subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。

  • 当含有 lift() 时:

    当含有 lift() 时: 1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;

    当含有 lift() 时: 1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe

    当含有 lift() 时: 1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe

    当含有 lift() 时: 1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;4.而这个新 OnSubscribecall() 方法中的 onSubscribe,就是指的原始 Observable 中的原始 OnSubscribe,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的变换代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。

    当含有 lift() 时: 1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;4.而这个新 OnSubscribecall() 方法中的 onSubscribe,就是指的原始 Observable 中的原始 OnSubscribe,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的变换代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。 这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

如果你更喜欢具象思维,可以看图:

lift() 原理图

或者可以看动图:

lift 原理动图

两次和多次的 lift() 同理,如下图:

两次 lift

举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

讲述 lift() 的原理只是为了让你更好地了解 RxJava,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。