新的一周开始了,大家继续加油呦!
本篇来自 yalinfendou 的投稿,分享了自己实现RxJava的心得,希望大家喜欢!
yalinfendou 的博客地址:
https://blog.csdn.net/yalinfendou
在过去和今年的谷歌IO大会上,第一次接触到RxJava时,被其优雅的链式调用风格和强大的操作符深深吸引,RxJava一路调用,一气呵成,用很简洁的代码轻松处理复杂的逻辑,一旦喜欢上就爱不释手。不仅如此,RxJvava还能在事件的传递过程中对事件进行各种加工处理,简直无与伦比。后来开始尝试阅读源码,当GET到部分心法要诀时,蓦然回首,原来想要造一个RxJava并不是很难,于是便有了此篇。希望你读完后,能够加深对RxJava的理解,并能深深地喜欢上RxJava。
网上关于RxJava 的文章很多。这里相关的使用方式不作详细介绍,如果你对基本用法还不熟悉,请先移步:GitHub(https://github.com/ReactiveX/RxJava) 或者 扔物线(http://gank.io/post/560e15be2dca930e00da1083)
本篇示例源码Git地址,建议下载Demo示例一起阅读。本篇涉及到相关源码基于RxJava 2.1.1。
为了能更好的理解后续实现逻辑流程,我们先简单梳理一下RxJava的基本概念和角色。
1. RxJava的观察者模式
Observable :被观察者,用来生产发送事件;
Observer:观察者,接收被观察者传来的事件;
Event:包装事件发送中的消息,在事件的传递过程中,可以通过操作符对事件进行各种加工(转换,过滤,组合……);
Subscribe:被观察者和观察者通过订阅产生关系后,才具备事件发送和接收能力;
2. RxJava的三个动作:
onNext()
onError()
onCompleted()
Observable 负责发出动作事件,这些事件经过一些分发处理流程后,Observer 负责接收对应的事件并消费掉。
再看一下Git官网的介绍:
RxJava – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
自己总结一下:RxJava 是以观察者模式为核心,可以通过强大的操作符,对事件中的消息进行加工包装,并且可以轻松实现线程调度的一个框架。
请你务必理解上面的几点,对后续的代码理解实现真的非常重要!
1. 基本订阅实现
首先我们先来实现角色一Observer
,用来接收事件消息,模仿源码,我们也定义四个方法:
接着再实现角色二Observable
,不过这里定义的ObservableSource
是一个基类接口,里面只提供了用来关联观察者和被观察者的方法:subscribe
。
你或许有些疑问,在传统的观察者模式里面,大都是由Observable
直接发出通知事件的,为什么上面没看到发送事件的方法呢?先不要急,在RxJava里面,其实是通过一个发射器对象Emitter
,把事件发出去的。那我们接着再看Emitter
。
是不是和前面的Observer
中定义的方法很相似?
最后,我们再来看看Observable
到底怎么通过Emitter
把事件给发出去的。
其实所有的一切都在核心类ObservableCreate
里面,当调用observable.subscribe(observer)
之后,立马会进入subscribeActual
方法,可以看到在subscribeActual
方法里面,有一句source.subscribe(emitter)
,
这句执行后,Emitter
中发出的事件最后就会分发给Observer
。
public final class ObservableCreate<T> extends Observable<T> {
//source 为create 中创建的ObservableOnSubscribe对象
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//传入的observer为被订阅的观察者
CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
//通知观察者被订阅,
observer.onSubscribe();
try {
//emitter开始执行,其发出的事件会传递到observer
RLog.printInfo("emitter开始发送事件");
source.subscribe(emitter);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 把Emitter发出的事件分发给observer
* @param <T>
*/
static final class CreateEmitter<T> implements ObservableEmitter<T> {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
CheckUtils.checkNotNull(t, "onNext called parameter can not be null");
observer.onNext(t);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onComplete() {
observer.onComplete();
}
@Override
public ObservableEmitter<T> serialize() {
return null;
}
}
}
好了,先来小试一下,这里先暂且不用链式代码。
通过Log,我们再回头梳理一下整个订阅以及发送事件的流程:
首先通过Observable.create创建一个ObservableCreate
对象并返回,完成订阅Observer
后,再创建一个发射器 CreateEmitter
对象,通过这个Emitter
,把事件传递给Observer
,于是Observable
中生产的事件就分发到Observer
了。
在RxJava源码中,调用observable.subscribe(observer)
后,紧接着会执行ObservableCreate
类中的subscribeActual
方法,接着调用source.subscribe(emitter)
,此时Observable
才会开始发事件,通过发射器Emitter把onNext
,onError
,onComplete
发送给被订阅的observer
,从而完成整个事件流的分发处理。
注意:RxJava中的观察者模式
有别于传统的观察者模式
,只有Observable
完成订阅Observer
之后,Observable
才会发出事件
2. Map操作符实现
Map
一般用于对事件中的消息进行加工处理,只能一对一转换。
想要拥有转换的能力,那么必然会有一个能够把Source
转换成Result
的方法。
我们来看一下RxJava源码中的这个转换接口 Function
。
T
表示输入值,R
表示输出值,把 T
转换成 R
。
另外还有重要的一点,我们知道,RxJava拥有逐级订阅的能力,所以每次经过操作符后,返回的必然是一个Observable
对象。
所以在调用 Observable.map()
后,返回的肯定也是一个Observable
。
在实现ObservableMap
之前,我们归纳一下ObservableMap
实现要点:
必须继承
Observable
;拥有转换能力,能通过
Function.apply
方法,把原始数据转换成目标数据;
开始实现吧,代码并不复杂,请留意注释:
public class ObservableMap<T, U> extends Observable<U> {
final Function<? super T, ? extends U> function;
final ObservableSource<T> source; //source 为create 中创建的ObservableOnSubscribe对象
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
this.source = source;
this.function = function;
}
public final ObservableSource<T> source() {
return source;
}
@Override
public void subscribeActual(Observer<? super U> observer) {
//传入的observer为被订阅的观察者
// mapObserver也是一个Observer对象,起到了桥接source(被观察者)和Observer(观察者)的作用,
// mapObserver中的事件最终会分发到传入的observer,在apply方法中,把传入的泛型转成R,这样就完成了map转换的功能
MapObserver mapObserver = new MapObserver<T, U>(observer, function);
//source订阅mapObserver之后 ,订阅成功后,source的emitter中的事件会分发给mapObserver,
// mapObserver通过apply方法,把传入的泛型T转成结果R,再通过onNext发送给真正的观察者actual,这样就完成了事件消息的传递和转换
source.subscribe(mapObserver);
}
static final class MapObserver<T, U> implements Observer<T> {
protected final Observer<? super U> actual;
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onSubscribe() {
RLog.printInfo("ObservableMap: onSubscribe");
}
@Override
public void onNext(T t) {
CheckUtils.checkNotNull(t, "onNext called parameter can not be null");
U v = null;
try {
v = mapper.apply(t);
} catch (Exception e) {
e.printStackTrace();
}
actual.onNext(v);
}
@Override
public void onError(Throwable error) {
actual.onError(error);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
}
其实真正的核心就这两句:
这里真正管事的是MapObserver
,完成订阅后,上一级的Observable
对象把事件发给了mapObserver
,mapObserver
又在它的onNext()
方法里面,把事件消息转换了一下,然后又发送了出去。有没有一种豁然开朗的的感觉……
看一下效果:
可以看到,emitter.onNext()
发送的 “1” 在apply
方法中被转成了“A1”,最终被observer
接收到。
这里和基本订阅实现的流程做一下对比梳理:
这里仍然是通过
Observable.create
创建了一个ObservableCreate
对象并返回;在基本订阅实现流程中,返回的
ObservableCreate
对象会直接订阅Observer
,事件会直接传递给Observer
;而在
ObservableMap
中,Observable.create
返回的ObservableCreate
对象订阅了一个MapObserver
对象,这个MapObserver
对象起到了桥接的作用;完成订阅后,
Observable
把事件传递给MapObserver
,MapObserver
通过apply
方法,把传入的泛型T
转成结果R
,再通过onNext
发送给真正的Observer
,这样就完成了事件消息的转换和传递;
3. FlapMap操作符实现
FlapMap
也是一个变换操作符,可以实现1对n的转换,被订阅的observer
可以接受n次事件消息。
仍然像上面一样,归纳一下FlatMap
实现要点:
必须继承
Observable
;拥有1对n的转换能力
1. 必须要拥有装载n个数据的一个容器;
2. 拥有发送n次的能力;
我们首先来获取装载n个数据
的能力,为了便于理解,这里先把使用的示例代码提前贴出来。可以看到,apply
方法的返回值,一个是Iterable
,一个是 array[]
数组,通过这两种数据容器,我们便拥有了两种不同方式装载数据的能力。
有了数据容器后,我们还要能把容器里的数据拿出来使用。
下面是ObservableFlapMapIterable
的实现,另外一个ObservableFlapMapArray
的实现和它大同小异,这里就不贴了。
仍然贴出核心代码:
可以看到,我们通过apply
方法,拿到装载n个数据的容器,然后再依次遍历,最后调用真正的观察者actual
的onNext()
方法,就这样实现了消息“1对n”的转换和发送。
如果你阅读RxJava源码,会发现它的实现和上面的实现有些区别:
1. RxJava apply
返回的是一个Observable
对象,在ObservableFromIterable
里面有一个Iterable source
,在ObservableFromArray
里面有一个T[] array
,RxJava就是通过这两个容器来装载数据的。
2. 在上面的实现中,我们是通过apply直接拿到数据容器,在RxJava的ObservableFlapMap
源码中有一个MergeObserver
和InnerObserver
,在InnerObserver
中,SimpleQueue
类型的 queue
变量,用来存储被加工后的数据集合,这个变量通过ObservableFromIterable
中source.onSubscribe(d)
被赋值,最终仍然是通过遍历操作,把数据再次发送出去。相关的逻辑实现比较复杂,这里就不多述了。
看看效果吧:
4. Zip操作符实现
我们继续来实现Zip操作符。
Zip操作符可以把多个Observable
发送的事件重新组合成一个新的事件,再发送出去。
明白了这一点,就可以归纳它的实现要点了:
1. 必须继承
Observable
;2. 拥有获取n个
Observable
发送事件的能力;3. 拥有合并数据并再发送的能力;
需要明确的是:再次发送的事件数量和发送事件少的那个Observable
事件数一样。
我们先来获取第2点的能力,这里让暂且n=2,只要你愿意,你让它等于多少都行。先看一下定义的接口:
t1
表示第一个observer
的泛型参数 , t2
表示第二个observer
的泛型参数,最后转换成结果R
。
再来获取第3点合并数据并再发送的能力,看看ObservableZip
是怎么实现的:
public class ObservableZip<T, R> extends Observable<R> {
BiFunction<? super Object, ? super Object, R> biFunction;
final ObservableSource<? extends T>[] sources;
public ObservableZip(ObservableSource<? extends T>[] sources, BiFunction<? super Object, ? super Object, R> biFunction) {
this.sources = sources;
this.biFunction = biFunction;
}
@Override
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, sources, biFunction);
zc.subscribe();
}
static final class ZipCoordinator<T, R> {
final Observer<? super R> actual;
final ObservableSource<? extends T>[] sources;
final BiFunction<? super Object, ? super Object, R> biFunction;
final ZipObserver<T, R>[] observers;
final T[] row;
ZipCoordinator(Observer<? super R> actual, ObservableSource<? extends T>[] sources,
BiFunction<? super Object, ? super Object, R> biFunction) {
this.actual = actual;
this.sources = sources;
this.biFunction = biFunction;
this.observers = new ZipObserver[sources.length];
this.row = (T[]) new Object[sources.length];
}
public void subscribe() {
int len = observers.length;
for (int i = 0; i < len; i++) {
observers[i] = new ZipObserver<T, R>(this);
}
//通知观察者被订阅,
actual.onSubscribe();
for (int i = 0; i < len; i++) {
sources[i].subscribe(observers[i]);
}
}
public void drain() {
final T[] os = row;
outer:
for (; ; ) {
int length = observers.length;
for (int i = 0; i < length; i++) {
ZipObserver<T, R> zipObserver = observers[i];
Queue<T> queue = zipObserver.queue;
if (queue.isEmpty()) {
if (observers[i].done) {
actual.onComplete();
}
break outer;
}
if (i == 1) {
os[0] = observers[0].queue.poll();
os[1] = observers[1].queue.poll();
if (null != os[0] && null != os[1]) {
try {
R result = biFunction.apply(os[0],os[1]);
actual.onNext(result);
Arrays.fill(os, null);
} catch (Exception e) {
e.printStackTrace();
actual.onError(e);
}
}
}
}
}
}
}
static final class ZipObserver<T, R> implements Observer<T> {
final ZipCoordinator<T, R> parent;
final Queue<T> queue = new LinkedBlockingQueue<>();
volatile boolean done;
ZipObserver(ZipCoordinator<T, R> parent) {
this.parent = parent;
}
@Override
public void onSubscribe() {
}
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
@Override
public void onError(Throwable t) {
done = true;
parent.drain();
}
@Override
public void onComplete() {
done = true;
parent.drain();
}
}
}
我们梳理一下上面代码的的主要逻辑流程:
RxJava源码中ObservableZip
的逻辑实现比较复杂,涉及到的类和接口也比较多。上面的实现算是一个精简版,但是完全能实现我们想要的功能。
再来看一下测试效果吧:
可以看到,observable1
和observable2
的onNext 事件确实是被合并后,再次发了出去。
还有一点需要注意,observable1
发送完了两个onNext
之后,observable2
才开始发送,为什么?因为它俩跑在同一个线程里面!我们接下来就要让它俩在不同的线程里面跑。
5. 线程调度subscribeOn和observeOn的实现
RxJava是通过Scheduler
来切换线程的。常用的几个内置线程调度器如下:
Schedulers.io()
代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
Schedulers.computation()
代表CPU计算密集型的操作, 例如需要大量计算的操作;
Schedulers.newThread()
为每个任务创建一个新线程;
AndroidSchedulers.mainThread()
代表Android的主线程;
上面只是概念介绍,我们必须要弄明白如何去实现!
1. 在Java开发中,开启一个线程,用
Thread
,Runnable
,Callable
都可以实现,可总不能随意开启野线程放任不管吧,所以肯定要用线程池;2. 在Android开发中,切换到UI线程,可以通过调用
Handler.post(Runnable r)
实现。这里的参数是个Runnable
对象,Executor.execute(Runnable command)
参数也是一个Runnable
对象,所以,我们可以很方便的把各个Scheduler
的线程调度方法统一起来;3. 无论是
subscribeOn()
还是observeOn()
,返回的肯定也是Observable
对象;
为了更好的理解Scheduler
线程调度的原理,我不厌其烦地把Map
操作符的实现原理再贴一遍:
在ObservableMap
中,Observable.create
返回的ObservableCreate
对象订阅了一个MapObserver
对象,完成订阅后,把事件传递给MapObserver
,MapObserver
又通过apply
方法,把传入的泛型T
转成结果R
,再通过onNext
发送给真正的Observer
,这样就完成了事件消息的传递和转换。
为了让Observable
发出的事件在新线程中执行,只要把“订阅”这个动作放入新的线程,emitter
发出的事件也就自然在新线程里面执行了。
实践是检验真理的唯一标准,我们先用野线程在ObservableFlapMap里面测试一下,
改造一下ObservableFlapMap#subscribeActual
方法试试看:
public void subscribeActual(Observer<? super U> observer) {
final MergeObserver mapObserver = new MergeObserver<T, U>(observer, function);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
source.subscribe(mapObserver);
}
});
thread.setName("new Observable Thread");
thread.start();
}
上面的代码中,我们只是把source.subscribe(mapObserver)
放到了一个子线程中。
通过log,可以看到生产事件
的动作,已经在新线程里面执行了。
我们还要让消费事件
的动作,也在新的线程中执行,很自然地会想到把actual.onNext(t)
,actual.onError(error)
,actual.onComplete()
这三个方法放到新线程中执行。
仍然用野线程在ObservableFlapMap
中做测试:
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
actual.onNext(t);
}
});
thread.setName("new Observer Thread");
thread.start();
观察log,可以发现Observer
的onNext()
跑在了新开启的线程里面。
如果你明白了上面线程调度的实现原理,那么我们再依葫芦画瓢,造几个线程调度器:IoScheduler
,NewThreadScheduler
,AndroidSchedulers
,源码就不再贴了,这里拿上面 observableZip
的例子测试一下:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
RLog.printInfo("observable1 emitter发送第一个onNext,value = 1");
emitter.onNext(1);
RLog.printInfo("observable1 emitter发送第二个onNext,value = 2");
emitter.onNext(2);
RLog.printInfo("observable1 emitter发送onComplete");
emitter.onComplete();
}
}).subscribeOn(Schedulers.NEW_THREAD);
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
RLog.printInfo("observable2 emitter发送第一个onNext,value = A");
emitter.onNext("A");
RLog.printInfo("observable2 emitter发送第二个onNext,value = B");
emitter.onNext("B");
//RLog.printInfo("observable2 emitter发送onComplete");
//emitter.onComplete();
}
}).subscribeOn(Schedulers.IO);
Observable<String> observableZip = Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).observeOn(Schedulers.ANDROID_MAIN_THREAD);
observableZip.subscribe(new Observer<String>() {
@Override
public void onSubscribe() {
RLog.printInfo("Observer被订阅");
}
@Override
public void onNext(String value) {
RLog.printInfo("Observer接收到onNext,被Zip转换之后的value = " + value);
}
@Override
public void onError(Throwable e) {
RLog.printInfo("Observer接收到onError,errorMsg = " + e.getMessage());
}
@Override
public void onComplete() {
RLog.printInfo("Observer接收到onComplete");
}
});
D/RxJava: [Thread: RxJava New Thread #1]_emitter开始发送事件
D/RxJava: [Thread: RxJava IO Thread #1]_emitter开始发送事件
D/RxJava: [Thread: RxJava New Thread #1]_observable1 emitter发送第一个onNext,value = 1
D/RxJava: [Thread: RxJava IO Thread #1]_observable2 emitter发送第一个onNext,value = A
D/RxJava: [Thread: RxJava New Thread #1]_observable1 emitter发送第二个onNext,value = 2
D/RxJava: [Thread: RxJava New Thread #1]_observable1 emitter发送onComplete
D/RxJava: [Thread: RxJava IO Thread #1]_observable2 emitter发送第二个onNext,value = B
D/RxJava: [Thread: main]_Observer被订阅
D/RxJava: [Thread: main]_Observer接收到onNext,被Zip转换之后的value = 1A
D/RxJava: [Thread: main]_Observer接收到onNext,被Zip转换之后的value = 2B
D/RxJava: [Thread: main]_Observer接收到onComplete
@Override
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, sources, biFunction);
zc.subscribe();
}
我们可以看到,在最前面的zip
的例子中,observable1
发送完两个onNext
之后,observable2
才开始发送,因为它俩在同一个线程。而上面的zip例子中,observable1
和observable2
是依次发送的,因为在它俩跑在不同的线程。
再多次调用subscribeOn()
和observeOn()
看看:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
RLog.printInfo("emitter发送第一个onNext,value = 1");
emitter.onNext(1);
RLog.printInfo("emitter发送onComplete");
emitter.onComplete();
}
}).subscribeOn(Schedulers.NEW_THREAD)
.subscribeOn(Schedulers.IO)
.subscribeOn(Schedulers.NEW_THREAD)
.observeOn(Schedulers.IO)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
RLog.printInfo("切换线程");
return "切换线程" + integer;
}
}).observeOn(Schedulers.ANDROID_MAIN_THREAD)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe() {
RLog.printInfo("Observer被订阅");
}
@Override
public void onNext(String value) {
RLog.printInfo("Observer接收到onNext,被转换之后的value = " + value);
}
@Override
public void onError(Throwable e) {
RLog.printInfo("Observer接收到onError,errorMsg = " + e.getMessage());
}
@Override
public void onComplete() {
RLog.printInfo("Observer接收到onComplete");
}
});
D/RxJava: [Thread: RxJava New Thread #1]_emitter开始发送事件
D/RxJava: [Thread: RxJava New Thread #1]_emitter发送第一个onNext,value = 1
D/RxJava: [Thread: RxJava New Thread #1]_emitter发送onComplete
D/RxJava: [Thread: RxJava IO Thread #1]_切换线程
D/RxJava: [Thread: main]_Observer接收到onNext,被转换之后的value = 切换线程1
D/RxJava: [Thread: main]_Observer被订阅
D/RxJava: [Thread: main]_Observer接收到onComplete
通过观察Log,可以发现以下两点:
1. 当多次调用subscribeOn()
时,只有第一个subscribeOn()
起作用。
上面的例子中,连续3次调用
subscribeOn(Schedulers.NEW_THREAD) .subscribeOn(Schedulers.IO) .subscribeOn(Schedulers.NEW_THREAD)
其实线程也是切换了三次,只不过最后一次切换成了第一个
subscribeOn()
指定的线程,所以只有第一个真正起到了作用。
2. 每次调用observeOn
,都会切换一下线程。
这个比较好理解,因为每次调用都会影响后面观察者运行的线程,线程改变后,会在新的线程中将数据发送给的
Observer
。
RxJava为了保障优雅性,健壮性,源码比这复杂庞大得的多。这里只是抛砖引玉,通过研究别人的轮子,弄懂造轮子的原理,提升自己,然后才能造出更好的轮子。
如果你明白了以上操作符的实现原理,那么其它的诸如filter
,sample
,take
,takeLast
,distinct
等操作符,相信也可以实现了。如果没看懂,也没关系,多看几遍,多动手写写试试,相信你也能体会到RxJava的真正魅力!
喜欢 就关注吧,欢迎投稿!
如有任何疑问可在文章底部留言。为了防止恶意评论,本博客现已开启留言审核功能。但是博主会在后台第一时间看到您的留言,并会在第一时间对您的留言进行回复!欢迎交流!
本文链接: https://leetcode.jp/自己动手来实现一个rxjava/