原理浅析
RxJava 原理与机制浅析
Subscribe
Observable.subscribe(Subscriber)
的内部实现是这样的(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber()
做了 3 件事:
- 调用
Subscriber.onStart()
。这个方法在前面已经介绍过,是一个可选的准备方法。 - 调用
Observable
中的OnSubscribe.call(Subscriber)
。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,Observable
并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()
方法执行的时候。 - 将传入的
Subscriber
作为Subscription
返回。这是为了方便unsubscribe()
.
整个过程中对象间的关系如下图:
或者可以看动图:
除了 subscribe(Observer)
和 subscribe(Subscriber)
,subscribe()
还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber
。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber,并使用 onNextAction、onErrorAction 和 onCompletedAction 来定义 onNext()、onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
简单解释一下这段代码中出现的 Action1
和 Action0
。Action0
是 RxJava 的一个接口,它只有一个方法 call()
,这个方法是无参无返回值的;由于 onCompleted()
方法也是无参无返回值的,因此 Action0
可以被当成一个包装对象,将 onCompleted()
的内容打包起来将自己作为一个参数传入 subscribe()
以实现不完整定义的回调。这样其实也可以看做将 onCompleted()
方法作为参数传进了 subscribe()
,相当于其他某些语言中的闭包。Action1
也是一个接口,它同样只有一个方法 call(T param)
,这个方法也无返回值,但有一个参数;与 Action0
同理,由于 onNext(T obj)
和 onError(Throwable error)
也是单参数无返回值的,因此 Action1
可以将 onNext(obj)
和 onError(error)
打包起来传入 subscribe()
以实现不完整定义的回调。事实上,虽然 Action0
和 Action1
在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX
形式的接口 (例如 Action2
, Action3
) 的,它们可以被用以包装不同的无返回值的方法。
注:正如前面所提到的,
Observer
和Subscriber
具有相同的角色,而且Observer
在subscribe()
过程中最终会被转换成Subscriber
对象,因此,从这里开始,后面的描述我将用Subscriber
来代替Observer
,这样更加严谨。
Transform
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
这段代码很有意思:它生成了一个新的 Observable
并返回,而且创建新 Observable
所用的参数 OnSubscribe
的回调方法 call()
中的实现竟然看起来和前面讲过的 Observable.subscribe()
一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber)
中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——
-
subscribe()
中这句话的onSubscribe
指的是Observable
中的onSubscribe
对象,这个没有问题,但是lift()
之后的情况就复杂了点。 -
当含有
lift()
时:当含有
lift()
时: 1.lift()
创建了一个Observable
后,加上之前的原始Observable
,已经有两个Observable
了;当含有
lift()
时: 1.lift()
创建了一个Observable
后,加上之前的原始Observable
,已经有两个Observable
了;2.而同样地,新Observable
里的新OnSubscribe
加上之前的原始Observable
中的原始OnSubscribe
,也就有了两个OnSubscribe
;当含有
lift()
时: 1.lift()
创建了一个Observable
后,加上之前的原始Observable
,已经有两个Observable
了;2.而同样地,新Observable
里的新OnSubscribe
加上之前的原始Observable
中的原始OnSubscribe
,也就有了两个OnSubscribe
;3.当用户调用经过lift()
后的Observable
的subscribe()
的时候,使用的是lift()
所返回的新的Observable
,于是它所触发的onSubscribe.call(subscriber)
,也是用的新Observable
中的新OnSubscribe
,即在lift()
中生成的那个OnSubscribe
;当含有
lift()
时: 1.lift()
创建了一个Observable
后,加上之前的原始Observable
,已经有两个Observable
了;2.而同样地,新Observable
里的新OnSubscribe
加上之前的原始Observable
中的原始OnSubscribe
,也就有了两个OnSubscribe
;3.当用户调用经过lift()
后的Observable
的subscribe()
的时候,使用的是lift()
所返回的新的Observable
,于是它所触发的onSubscribe.call(subscriber)
,也是用的新Observable
中的新OnSubscribe
,即在lift()
中生成的那个OnSubscribe
;4.而这个新OnSubscribe
的call()
方法中的onSubscribe
,就是指的原始Observable
中的原始OnSubscribe
,在这个call()
方法里,新OnSubscribe
利用operator.call(subscriber)
生成了一个新的Subscriber
(Operator
就是在这里,通过自己的call()
方法将新Subscriber
和原始Subscriber
进行关联,并插入自己的变换代码以实现变换),然后利用这个新Subscriber
向原始Observable
进行订阅。当含有
lift()
时: 1.lift()
创建了一个Observable
后,加上之前的原始Observable
,已经有两个Observable
了;2.而同样地,新Observable
里的新OnSubscribe
加上之前的原始Observable
中的原始OnSubscribe
,也就有了两个OnSubscribe
;3.当用户调用经过lift()
后的Observable
的subscribe()
的时候,使用的是lift()
所返回的新的Observable
,于是它所触发的onSubscribe.call(subscriber)
,也是用的新Observable
中的新OnSubscribe
,即在lift()
中生成的那个OnSubscribe
;4.而这个新OnSubscribe
的call()
方法中的onSubscribe
,就是指的原始Observable
中的原始OnSubscribe
,在这个call()
方法里,新OnSubscribe
利用operator.call(subscriber)
生成了一个新的Subscriber
(Operator
就是在这里,通过自己的call()
方法将新Subscriber
和原始Subscriber
进行关联,并插入自己的变换代码以实现变换),然后利用这个新Subscriber
向原始Observable
进行订阅。 这样就实现了lift()
过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable
执行了 lift(Operator)
方法之后,会返回一个新的 Observable
,这个新的 Observable
会像一个代理一样,负责接收原始的 Observable
发出的事件,并在处理后发送给 Subscriber
。
如果你更喜欢具象思维,可以看图:
或者可以看动图:
两次和多次的 lift()
同理,如下图:
举一个具体的 Operator
的实现。下面这是一个将事件中的 Integer
对象转换成 String
的例子,仅供参考:
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
讲述
lift()
的原理只是为了让你更好地了解 RxJava,从而可以更好地使用它。然而不管你是否理解了lift()
的原理,RxJava 都不建议开发者自定义Operator
来直接使用lift()
,而是建议尽量使用已有的lift()
包装方法(如map()
flatMap()
等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。