前言

自从去年8月底《浅谈RxJava与2.0的新特性》,已经过去快一年。笔者也没想到此文竟有读者等笔者填坑快一年了,不禁汗颜。所以笔者打算写关于一个 RxJava2 的系列文章,既为填坑,也为回报读者对我的支持。本文为第一篇。

读本系列文章,你可能有如下收获:

  • 了解其设计原理,代码实现
  • 掌握操作符的正确使用姿势,避免采坑
  • 强化 Rx 编程思想,写出更 Rx 的代码
  • 跟读精彩的源码,强化编程功底

废话不多说,进入正题。

Reactive Streams

之前在《浅谈RxJava与2.0的新特性》我们提到过, RxJava2 遵循 Reactive Streams 的编程规范, 或者更精确的说,是 RxJava2 中的 Flowable 相关的类。因此我们只分析 Flowable 相关的实现与使用,剩下的 Observable、 Completable、Single、 Maybe 这些不会再提及,相信读者朋友们可以举一反三。

Reactive Streams 中明确规范了如下4点:

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure.

后面笔者会用 RS 代替全称。
请跟随本系列文章慢慢看 Flowable 是出色的完成上述的要求。

阅读源码的正确姿势

Rx2 在源码中加入了一些注解,这些注解对运行没有任何实际作用,仅仅是用作标识备注,有助于开发者了解某个操作符的正确使用姿势,同时也有利于阅读源码时整理思路。这些注解位于io.reactivex.annotations包名下,这里着重介绍一个。

BackpressureSupport

BackpressureSupport 是用作标识这个操作符对背压的支持类型,有以下几种:

  • PASS_THROUGH:表示这个操作符仅仅传递背压类型,不做任何改变。例如defer 操作符,这个操作符支持的背压类型取决于Callable产生的Publisher
  • FULL:表示这个操作符支持完全的背压,协调上下游关系
  • SPECIAL:表示这个操作符支持的背压类型由方法上的文档说明
  • UNBOUNDED_IN:表示这个操作符会向上游请求 Long.MAX_VALUE,并协调下游
  • ERROR:表示如果下游没有足够数量的request,上游发射了超额的数据,这个操作符会抛出一个MissingBackpressureException
  • NONE:表示不处理背压

上面这些字面解释看起来还是很绕的,尤其是对于没有阅读过相关源码的读者。我们也不必一次性全部弄明白,后续会慢慢讲清楚所有。

走进create源码

前文中有提到过,Rx2 收回了create方法的权限,使开发者自定义的create也能够正确的支持背压。而实现的方式就是通过额外提供一个BackpressureStrategy参数。也因此,create的方法注解中 BackpressureSupportSPECIAL

FlowableCreate

抛开Rx2提供的 plugin 不谈,本质上就是用 create 传进来的2个参数创建了FlowableCreate这个类。
根据传入的BackpressureStrategy生成不同的Emitter对象。并遵循一致的编程约定,先调用 onSubscribe,随后将Emitter传递给FlowableOnSubscribe用来发射数据。

@Override
public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;

    switch (backpressure) {
    case MISSING: {
        emitter = new MissingEmitter<T>(t);
        break;
    }
    case ERROR: {
        emitter = new ErrorAsyncEmitter<T>(t);
        break;
    }
    case DROP: {
        emitter = new DropAsyncEmitter<T>(t);
        break;
    }
    case LATEST: {
        emitter = new LatestAsyncEmitter<T>(t);
        break;
    }
    default: {
        emitter = new BufferAsyncEmitter<T>(t, bufferSize());
        break;
    }
    }

    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}

BackpressureStrategy

Rx2 大量的类通过继承AtomicLong来表示计算个数与发射个数,请求一个+1,发射一个-1。并且在 Rx2 中 Long.MAX_VALUE 有特殊含义,表示无限的数据。即,如果 request(Long.MAX_VALUE),即使发射了数据也不会减少自身的数值。

这里所有的 Emitter 都继承了基类BaseEmitter,并提供一些公共方法如setDisposable/setCancellable/requested/serialize等。然后根据各自的背压策略,实现相应的逻辑,下面分别介绍。

MISSING

MISSING即没有背压,我们看 onNext 函数会发现,每调用一次就会传递给下游的 subscriber.onNext,空指针则onError

@Override
public void onNext(T t) {
    if (isCancelled()) {
        return;
    }

    if (t != null) {
        actual.onNext(t);
    } else {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }

    for (;;) {
        long r = get();
        if (r == 0L || compareAndSet(r, r - 1)) {
            return;
        }
    }
}

上面的代码是2.1.2版本的源码,笔者认为这里有一处 BUG 。即在自减的时候,没有检查 Long.MAX_VALUE 的情况,导致在request(Long.MAX_VALUE)后,发射数据时依然会不断自减,这是与一致的设计思路相悖的。反观下面 DROP 与 BUFFER 相关的 Emitter 处理时,则直接调用了BackpressureHelper.produced(this, 1),在里面会有 Long.MAX_VALUE 的判断。

虽然有点小 BUG,但是实际中除了在requested()函数中会出错外,不会影响正常的执行流。且一般开发者也不会使用Emitter.requested()函数。

虽然 MISSING 不支持背压,但是没关系,我们可以通过操作符来弥补。

这些操作符结合使用 MISSING 的 create 方法,使得原本不支持背压的 Flowable 支持背压。

当然我们也大可不必这样麻烦,既然要使用 buffer、drop 或者 latest,使用下面的策略即可。除非我们需要那些操作符提供的额外功能。

ERROR

ERROR则和最开始的BackpressureSupport.ERROR表现一致。

下面这块代码是NoOverflowBaseAsyncEmitter,会被 ERROR 和 DROP 对应的Emitter继承。逻辑也很简单,即请求数如果还大于0,则向下发射并将请求数减1,否则走onOverflow()方法。

@Override
public final void onNext(T t) {
    if (isCancelled()) {
        return;
    }

    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }

    if (get() != 0) {
        actual.onNext(t);
        BackpressureHelper.produced(this, 1);
    } else {
        onOverflow();
    }
}

而 ERROR 对应的实现则很简单了,不在赘述。

@Override
void onOverflow() {
    onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}

DROP

DROP即直接丢弃超额的数据,体现在代码中就非常简单。

@Override
void onOverflow() {
    // nothing to do
}

BUFFER 与 LATEST

这俩之所以放到了一起,是因为 BUFFER 与 LATEST 本质上都是缓存了数据,细节上的区别就是,BUFFER 是缓存了所有数据,而 LATEST 只保留了最近的一个 onNext 数据。

体现在代码中这两者最主要的区别就是一个用了队列来缓存,一个用了AtomicReference 来维持最后一个未被消费的数据。

就挑 BUFFER 来说, onNext 就是将数据扔进队列,而后尝试消费数据即调用drain()。onError 与 onComplete 则是将 结束标识置为 true ,并保留异常,然后依然也是在drain()中消费该消息。
在/onNext/onError/onComplete/onRequested时,都会调用drain()来消费队列中的数据。

@Override
public void onNext(T t) {
    if (done || isCancelled()) {
        return;
    }

    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    queue.offer(t);
    drain();
}

drain中做的事就比较复杂了,为了保证线程安全,首先通过一个AtomicInteger来确保只有一个线程可以进入for(;;)循环。
在 for 循环中,不断的消费队列中的数据,如果队列为空则检查结束标识是否为 true,是的话则发射 onComplete 或者 onError 。

void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    final Subscriber<? super T> a = actual;
    final SpscLinkedArrayQueue<T> q = queue;

    for (;;) {
        long r = get();
        long e = 0L;

        while (e != r) {
            if (isCancelled()) {
                q.clear();
                return;
            }

            boolean d = done;

            T o = q.poll();

            boolean empty = o == null;

            if (d && empty) {
                Throwable ex = error;
                if (ex != null) {
                    error(ex);
                } else {
                    complete();
                }
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(o);

            e++;
        }

        if (e == r) {
            if (isCancelled()) {
                q.clear();
                return;
            }

            boolean d = done;

            boolean empty = q.isEmpty();

            if (d && empty) {
                Throwable ex = error;
                if (ex != null) {
                    error(ex);
                } else {
                    complete();
                }
                return;
            }
        }

        if (e != 0) {
            BackpressureHelper.produced(this, e);
        }

        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

这里请大家留意一个编程的套路,在绝大多数队列消费的场景里, Rx2 中都是使用了下面的方式。这也是我们可以积累使用的。通过这种方式可以保证for循环里的代码是单线程执行的,且如果执行期间有一次或多次新的调用drain(),会导致重新走一遍包含注释处的代码,确保数据可以正确的消费发射。

void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    for (;;) {
        
        // 消费队列, 发射数据

        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

小结

笔者在介绍过程中已经省略了很多细枝末节,不免显得知识有些分散,结合源码阅读效果更佳。没想到一个小小的 create 也包含这么多的玄机。

我相信通过阅读这篇文章,读者们写 create 的时候应该可以做到结合实际场景选择正确的BackpressureStrategy

有了 create 便从此开启 Rx2 万里长征第一步。下一篇,我们将会介绍 Rx2 的线程调度相关的操作符及其实现,敬请期待。

最后欢迎关注笔者的微信公众号,每一篇新的博文都将会在第一时间发布在公众号上。