RxJava(一)基本流程源码浅析

RxJava(一)基本流程源码浅析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
System.out.println("bbb");
e.onNext(1);
e.onNext(2);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("aaa");
}
@Override
public void onNext(Integer integer) {
System.out.println(String.valueOf(integer));
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
等价于
ObservableOnSubscribe<Integer> A = new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
System.out.println("bbb");
e.onNext(1);
e.onNext(2);
e.onComplete();
}
};
Observer<Integer> B = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("aaa");
}
@Override
public void onNext(Integer integer) {
System.out.println(String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}; Observable.create(A).subscribe(B);

上面是一段基本的Rxjava示例代码。用久了Rxjava之后就会有几个问题提出。

  1. ObservableEmitter中的数据是怎么传递到下游的?
  2. 为什么onErroronComplete互斥?
  3. onSubscribe和其他方法的执行顺序是什么?

基本流程分析

1. Observable.create()

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

当我们执行这个代码的时候,创建了一个A对象。传入的是一个ObservableOnSubscribe匿名对象。ObservableOnSubscribe是一个接口,含有一个subscribe方法。

1
2
3
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

这个A对象会传入ObservableCreate进行存储。

2. subscribe

当执行subscribe方法的时候,内部执行的是ObservablesubscribeActual方法。这个方法在之前的ObservableCreate有定义。

1
2
3
4
5
6
7
public final void subscribe(Observer<? super T> observer) {
try {
...
subscribeActual(observer);
}
...
}

所以我们执行.subscribe的时候实际上执行的是ObservableCreatesubscribeActual方法,即执行我们创建的A对象的subscribe方法。

1
2
3
4
5
6
7
8
9
@Override protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

这个方法首先用.subscribe()里的B对象实例化一个CreateEmitter对象。然后调用了B对象的onSubscribe方法。所以在上面的流程中,onSubscribe会最先调用。
这个source.subscribe(parent);中的source就是我们传入的A对象。意思是到执行了这一步,实际是走的是A对象的subscribe方法了。即我们写的这段代码。

1
2
3
4
System.out.println("bbb");
e.onNext(1);
e.onNext(2);
e.onComplete();

e.onNext(1);发送的数据去哪了?是什么时候跑到了下面的流程执行的?

3. CreateEmitter

在执行了subscribe调用了ObservableCreatesubscribeActual之后。利用下面流程的B对象创建了CreateEmitter对象。

1
2
CreateEmitter(Observer<? super T> observer) {
this.observer = observer; }

e.onNext(1);调用的时候,实际上执行的是CreateEmitteronNext方法。这个方法会执行B对象的onNext方法。所以到了这,大致流程已经可以理通了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false; }

查看上面的代码可以很容易看出,onCompleteonError互斥的原因。