原理浅析
RxJava 原理与机制浅析
Subscribe
Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
可以看到,subscriber() 做了 3 件事:
- 调用 Subscriber.onStart()。这个方法在前面已经介绍过,是一个可选的准备方法。
- 调用 Observable中的OnSubscribe.call(Subscriber)。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。
- 将传入的 Subscriber作为Subscription返回。这是为了方便unsubscribe().
整个过程中对象间的关系如下图:
  
或者可以看动图:
  
除了 subscribe(Observer) 和 subscribe(Subscriber),subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber。形式如下:
Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};
// 自动创建 Subscriber,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber,并使用 onNextAction、onErrorAction 和 onCompletedAction 来定义 onNext()、onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
简单解释一下这段代码中出现的 Action1 和 Action0。Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的闭包。Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。
注:正如前面所提到的,
Observer和Subscriber具有相同的角色,而且Observer在subscribe()过程中最终会被转换成Subscriber对象,因此,从这里开始,后面的描述我将用Subscriber来代替Observer,这样更加严谨。
Transform
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}
这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——
- 
subscribe()中这句话的onSubscribe指的是Observable中的onSubscribe对象,这个没有问题,但是lift()之后的情况就复杂了点。
- 
当含有 lift()时:当含有 lift()时: 1.lift()创建了一个Observable后,加上之前的原始Observable,已经有两个Observable了;当含有 lift()时: 1.lift()创建了一个Observable后,加上之前的原始Observable,已经有两个Observable了;2.而同样地,新Observable里的新OnSubscribe加上之前的原始Observable中的原始OnSubscribe,也就有了两个OnSubscribe;当含有 lift()时: 1.lift()创建了一个Observable后,加上之前的原始Observable,已经有两个Observable了;2.而同样地,新Observable里的新OnSubscribe加上之前的原始Observable中的原始OnSubscribe,也就有了两个OnSubscribe;3.当用户调用经过lift()后的Observable的subscribe()的时候,使用的是lift()所返回的新的Observable,于是它所触发的onSubscribe.call(subscriber),也是用的新Observable中的新OnSubscribe,即在lift()中生成的那个OnSubscribe;当含有 lift()时: 1.lift()创建了一个Observable后,加上之前的原始Observable,已经有两个Observable了;2.而同样地,新Observable里的新OnSubscribe加上之前的原始Observable中的原始OnSubscribe,也就有了两个OnSubscribe;3.当用户调用经过lift()后的Observable的subscribe()的时候,使用的是lift()所返回的新的Observable,于是它所触发的onSubscribe.call(subscriber),也是用的新Observable中的新OnSubscribe,即在lift()中生成的那个OnSubscribe;4.而这个新OnSubscribe的call()方法中的onSubscribe,就是指的原始Observable中的原始OnSubscribe,在这个call()方法里,新OnSubscribe利用operator.call(subscriber)生成了一个新的Subscriber(Operator就是在这里,通过自己的call()方法将新Subscriber和原始Subscriber进行关联,并插入自己的变换代码以实现变换),然后利用这个新Subscriber向原始Observable进行订阅。当含有 lift()时: 1.lift()创建了一个Observable后,加上之前的原始Observable,已经有两个Observable了;2.而同样地,新Observable里的新OnSubscribe加上之前的原始Observable中的原始OnSubscribe,也就有了两个OnSubscribe;3.当用户调用经过lift()后的Observable的subscribe()的时候,使用的是lift()所返回的新的Observable,于是它所触发的onSubscribe.call(subscriber),也是用的新Observable中的新OnSubscribe,即在lift()中生成的那个OnSubscribe;4.而这个新OnSubscribe的call()方法中的onSubscribe,就是指的原始Observable中的原始OnSubscribe,在这个call()方法里,新OnSubscribe利用operator.call(subscriber)生成了一个新的Subscriber(Operator就是在这里,通过自己的call()方法将新Subscriber和原始Subscriber进行关联,并插入自己的变换代码以实现变换),然后利用这个新Subscriber向原始Observable进行订阅。 这样就实现了lift()过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
如果你更喜欢具象思维,可以看图:
  
或者可以看动图:
  
两次和多次的 lift() 同理,如下图:
  
举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:
observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }
            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});
讲述
lift()的原理只是为了让你更好地了解 RxJava,从而可以更好地使用它。然而不管你是否理解了lift()的原理,RxJava 都不建议开发者自定义Operator来直接使用lift(),而是建议尽量使用已有的lift()包装方法(如map()flatMap()等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。
