RxJava(一)基本流程源码浅析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { System.out.println("bbb"); e.onNext(1); e.onNext(2); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("aaa"); } @Override public void onNext(Integer integer) { System.out.println(String.valueOf(integer)); } @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); 等价于 ObservableOnSubscribe<Integer> A = new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { System.out.println("bbb"); e.onNext(1); e.onNext(2); e.onComplete(); } }; Observer<Integer> B = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("aaa"); } @Override public void onNext(Integer integer) { System.out.println(String.valueOf(integer)); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; Observable.create(A).subscribe(B);
|
上面是一段基本的Rxjava
示例代码。用久了Rxjava
之后就会有几个问题提出。
ObservableEmitter
中的数据是怎么传递到下游的?
- 为什么
onError
和onComplete
互斥?
onSubscribe
和其他方法的执行顺序是什么?
- …
基本流程分析
1. Observable.create()
1 2 3 4
| public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
|
当我们执行这个代码的时候,创建了一个A对象。传入的是一个ObservableOnSubscribe
匿名对象。ObservableOnSubscribe
是一个接口,含有一个subscribe
方法。
1 2 3
| public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; }
|
这个A对象会传入ObservableCreate
进行存储。
2. subscribe
当执行subscribe
方法的时候,内部执行的是Observable
的subscribeActual
方法。这个方法在之前的ObservableCreate
有定义。
1 2 3 4 5 6 7
| public final void subscribe(Observer<? super T> observer) { try { ... subscribeActual(observer); } ... }
|
所以我们执行.subscribe
的时候实际上执行的是ObservableCreate
的subscribeActual
方法,即执行我们创建的A对象的subscribe
方法。
1 2 3 4 5 6 7 8 9
| @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
|
这个方法首先用.subscribe()
里的B对象实例化一个CreateEmitter
对象。然后调用了B对象的onSubscribe
方法。所以在上面的流程中,onSubscribe
会最先调用。
这个source.subscribe(parent);
中的source
就是我们传入的A对象。意思是到执行了这一步,实际是走的是A对象的subscribe
方法了。即我们写的这段代码。
1 2 3 4
| System.out.println("bbb"); e.onNext(1); e.onNext(2); e.onComplete();
|
那e.onNext(1);
发送的数据去哪了?是什么时候跑到了下面的流程执行的?
3. CreateEmitter
在执行了subscribe
调用了ObservableCreate
的subscribeActual
之后。利用下面流程的B对象创建了CreateEmitter
对象。
1 2
| CreateEmitter(Observer<? super T> observer) { this.observer = observer; }
|
当e.onNext(1);
调用的时候,实际上执行的是CreateEmitter
的onNext
方法。这个方法会执行B对象的onNext
方法。所以到了这,大致流程已经可以理通了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; }
|
查看上面的代码可以很容易看出,onComplete
和onError
互斥的原因。