Rxjava源码分析-一次完成的事件消费过程

公司在项目上用Rxjava大概也有几个月了,从之前学习Rxjava到应用,虽然能自如使用Api,但细节原理性掌握并不好。因此写下学习源码的过程,好记性不如烂笔头,慢慢学习和积累。
文章不对基本概念做阐述,一切尽在google,侧重掌握整个流程及细节逻辑。

Tip 源码版本:1.2.1
compile ‘io.reactivex:rxjava:1.3.0’
compile ‘io.reactivex:rxandroid:1.2.1’
Rxjava的简介及api适用范围可参考 Rxjava api使用范围

observable

在as中直接对Observable使用Ctrl+F12搜索方法。常用的构建Observable对象的方法大概有:just/from/startWIth/interval/repeat/defer/range,还有一个核心的方法create
上述所有方法最终都会调用create方法实现。在rxjava1.2版本,可直接调用create方法构建.在之后的版本里,官方并不推荐这么做。
create实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//1
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

//2
public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {
return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure));
}

//3
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

//4
public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
return unsafeCreate(syncOnSubscribe);
}

//5
@Beta
public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
return unsafeCreate(asyncOnSubscribe);
}

方法1中,从官方的角度看不推荐直接传递OnSubscribe对象直接构建Observable,至于原因后续文章会讲,这里只看当前create的使用。
方法2/4/5最终调用的是3方法,从方法名上可知是一种unsafe的构建做法,与后续实现的backpressure背压有关系。
方法5从官方的解释中看到

@Beta
APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed in any major or minor release but not in a patch release. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your own control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

大概的意思就是该api随时会改变,app正式版本谨用,最终所有构建方法都会调用

1
2
3
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

RxjavaHooks 是啥呢?根据RxjavaHooks类注解

Utility class that holds hooks for various Observable, Single and Completable lifecycle-related
points as well as Scheduler hooks.
The class features a lockdown state, see {@link #lockdown()} and {@link #isLockdown()}, to
prevent further changes to the hooks.
@since 1.3

soga,是一个工具类,用于保证Observable构建过程的生命周期控制及调度控制,进一步看下去

1
2
3
4
5
6
7
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}

这里不深究hook的任何操作,如有机会后续深入去了解hook在控制rxjava事件流中的作用。实际上方法返回参数onSubscribe对象。至此,我们只需要记住,Observable是按照这种方式构建出来。

OnSubscribe

OnSubscribe到底是什么呢?OnSubscribe实际上定义于Observable类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//1
public interface Function {}

//2
public interface Action extends Function {}

//3
public interface Action1<T> extends Action {
void call(T t);
}

//4
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}

直观看出OnSubscribe是一个连接Subscriber的接口,在call方法中把Subscriber作为参数传递。而Subscriber是一个观察者,
Observable是一个被观察者,于是有一个大胆的想法,若基于观看者模式而言,以下伪代码可模拟事件响应流程

1
2
3
4
5
6
7
//步骤1,假设事件对象T,产生T
T t = new T();

//步骤2,假设处理观察者响应事件,
Observable.onSubscribe#call(Subscribe subscribe){
subscribe.handle(t);
}

到这里,我们可以认为:OnSubscribe是一个连接观察者和被观察者的枢纽,也是事件源产生之后,观察者被回调的重要调度者。后半句话怎么理解呢?慢慢往下看。

subscriber

Subscriber,rxjava中常用的观察者,看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//1
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
//2
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
//3
public abstract class Subscriber<T> implements Observer<T>, Subscription {
//......
public void onStart() { }
}

接口1提供是否取消订阅的行为。
接口2为标准观察者相应事件后的行为。显然,subscriber为rxjava中处理事件流的积累,可能自由取下订阅与处理事件,onStart方法用于事件处理前的行为。先忽略Subscriber是如何处理事件,先回到上文。拥有了Observable对象和OnSubscribe枢纽,Observable对象如何拿到Subscriber的对象引用呢?
Observable类源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   //1
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

//2
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

//2.1
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}

//2.2
subscriber.onStart();

//2.3
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}

try {
//2.4 RxJavaHooks#onObservableStart返回onSubscribe对象
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
//...省略部分代码
}
return Subscriptions.unsubscribed();
}

方法1中传递Subscriber对象调用方法2,源码中还有该类重载了很多subscribe方法,这里只看最后调用的核心方法。
在方法2中

  1. Subscriber对象做非空判断保护
  2. 事件处理前的操作
  3. Subscriber对象做安全保护,遵循响应策略
  4. OnSubscribe调用call方法。
    从上述过程可知,当Observable对象调用subscribe 方式时,其持有的 OnSubscribe对象会调用call方法。而 call方法里面的具体实现则得看 OnSubscribe子类覆盖方法逻辑。至此,观察者/被观察者/枢纽都有了,整个事件回调链算完整了,下面看一下整个完整的过程。

example

just构建Observable为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just("a","b")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
System.out.print(s);
}
});

运行结果

1
a b

按照前三节的逻辑

  1. Observable.just(“a”,”b”)创建被观察者Observable对象
  2. observable#subscriber(匿名构建Subscriber对象) 创建观察者Subscriber对象并通过subscribe连接
  3. OnSubscribe调用call方法
    从打印ab猜想,call方法中应该把”a”,”b”参数传递给Subscriber对象并回调onNext方法,所以一切逻辑应该在OnSubscribe子类中定义,看源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//1
public static <T> Observable<T> just(T t1, T t2) {
return from((T[])new Object[] { t1, t2 });
}

//2
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return unsafeCreate(new OnSubscribeFromArray<T>(array));
}

可以看到,传递给Observable构造器的是OnSubscribe子类OnSubscribeFromArray对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}

@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
}

//Subscriber类
public abstract class Subscriber<T> implements Observer<T>, Subscription {
 //省略其他代码
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
//1
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
if (toRequest == NOT_SET) {
passToSubscriber = true;
}
}
}
//2
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
//3
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}

首先我们传递的a b被构建成Array对象传递给OnSubscribeFromArray。在call方法调用的时候为Subscriber设置了Producer对象,何为Producer呢?

Producer对象,主要用于生产并控制事件有序的发射。这里大概简单理解如此即可。

Subscriber设置了FromArrayProducer数据生产对象中,setProducer方法逻辑为

  1. 代码1中requested表示已经请求的数据,默认是NOT_SET。所以第一次会进入锁住的代码块,第二次则进入else
  2. 代码2中如果没有设置请求数目,则默认请求Long.MAX_VALUE数据。

看看FromArrayProducer如何实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//1
public interface Producer {
void request(long n);


//2
static final class FromArrayProducer<T> extends AtomicLong implements Producer {
final Subscriber<? super T> child;
final T[] array;
int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}

@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
//2.1
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

void fastPath() {
final Subscriber<? super T> child = this.child;
//2.2
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(t);
}

//2.3
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}

void slowPath(long r) {
//省略
}
}

上述代码可知

  1. 代码1定义了一个producer接口,提供请求数据requset方法
  2. 代码2实现producer接口,关联一个观察者subscriber对象,但是为什么数据的生产却关联观察者而不被观察者呢?实际上这里理解为数据生产个人认为是偏差的。正确的理解应该是按照观察者意愿索要的数据提供者。array实际上包含了所有存在的事件对象,而把所有事件对象传递给producer对象,由subscriber对象决定要request多少事件对象
  3. 代码2.1中发现请求数n为Long.MAX_VALUE,默认发射所有数据
  4. 代码2.2中默认传递所有事件对象给subscriber对象回调
  5. 代码2.3中判断是否取消订阅,如果取消订阅,则直接返;否则则完成事件回调后调用onCompleted,遵循响应策略
    至此,我们回顾下整个过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just("a","b")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
System.out.print(s);
}
});

调用链如下
observable#subscribe
-->Onsubscriber#call
---->Subscribe#setProducer
------>FromArrayProducer#request -> #fastPath
-------->Subscribe#onNext -> #onCompleted

至此,一次完成的事件消费过程已完成。