前言

本系列文章适用于已经了解 RxJava 的读者,深入贯彻其原理,加深对其的认识。如果从未了解过 RxJava 的读者们,建议先熟悉

RxJava 0.x

RxJava 最早是 Netflix 参照微软的 Rx.Net,在 Java 上实现一套类似的库,0.x 其实就是社区内部迭代开发的时代。

在 0.x 的迭代过程中,API 还不稳定,在长期的变更中,逐步完善了 Observable,Publisher,Subscriber,Scheduler 等接口以及大量的操作符。

Reactive Streams

在开发 RxJava 早期版本的过程中,开发组也参与了制定 Reactive Streams 规范。

但是 RxJava1 并没有遵循这个规范,因为考虑到下面几个原因:

  1. Subscriber.onComplete 和 当时的 Observer.onCompleted 相差一个字母,导致 API 不能兼容。
  2. RxJava 中的 Subscription 和 RS 中的有些细微的区别,它不能在没有较大变动的情况下实现 request(n) 方法,因此 RxJava 引入了 Producer 这个接口,并混在了 Subscriber 中。
  3. RS 彼时不是稳定版本,也在持续修订中。

因此 RxJava 开发组决定在 2.0 版本中正式支持 RS 规范,在 1.x 版本中实现类似的机制,而不像在 2.0 中直接使用 RS 的接口。

Backpressure

由于 RS 的影响,在 0.20.0-RC1 中 RxJava 第一次引入 Backpressure 的概念,从此 RxJava 变成了一个让人爱恨交织的库。事实上 RxJava 开发组也曾表示,在 RxJava 早期版本中,在 Observable 混入 Backpressure 是一个重大的失误。

  • 由于 Backpressure 是在中间版本引入的,因此部分操作符支持,部分操作符不支持,导致对使用者有些混乱和不友好。
  • 在热数据源中(如点击事件),是无法被正确地 backpressured ,从而导致经常出现意外的 MissingBackpressureException

事实上正如现在 2.x 中做的那样,正确的做法是应该把模块分为 支持 Backpressure 和不支持的两类。在 io.reactivex.Observable 中彻底移除了 Backpressure,而 io.reactivex.Flowable 则遵循 RS 规范支持 Backpressure。

RxJava 1.x

经过两年多的迭代,RxJava 在 14 年 9 月发布了 1.0.0 正式版。
上面有提到,其实 1.x 是一个类似 RS 的版本,但是不依赖 RS 的接口。同时对比 0.x,做了如下的更改。

依赖方式
groupId artifactId
0.x com.netflix.rxjava rxjava-core
1.x io.reactivex rxjava
拆分项目

在 RxJava 1.0.0 发布之际,把 JVM 上其他语言的实现和子工程都独立出去了,而在 RxJava 库中只保留了 Java 版的实现。
如:

其他

新增和废弃了部分操作符,修复了大量的 BUG。

RxJava 2.x

RxJava 2.0.0 正式版发布于 2016 年 9 月底。
笔者也曾写过一篇 《浅谈RxJava与2.0的新特性》,不过写那篇文章时还在版本还在 2.0.0-RC1,以现在的角度看起来不免显得不够全面,因此最好的理解方式还是看官方的 Wiki

类型

1.x 2.x
Single/Completable Maybe 0 或 1 数据源
Observable Flowable 多数据源
Subject Processor 热数据源

正如上面所说的,RxJava2 遵循了 RS 规范,其冷数据源真正实现的类型就是 Flowable,热数据源的实现则在io.reactivex.processors包中。
同时也把 Observable 中旧的 Backpressure 彻底移除,因此在 RxJava2 中使 用 Observable 再也不会抛出MissingBackpressureException

Observable 与 Flowable

在 RxJava2 中, Flowable 和 Observable 虽然实现的代码复用了一部分,但是机制却大相径庭。这里要涉及到数据源的三种模型:

数据源
  • 响应式推:如鼠标点击事件。此时生产者无条件产生数据,消费者负责配合生产者。
  • 同步拉:如Iterable的迭代。此时消费者提出要求,生产者配合消费者下发,数据源是确定的。
  • 异步拉:如FutureProcessor。消费者提出要求,生产者据此下发数据,但是数据到来不确定。
Observable

在 Observable 中,消费者是无权提出要求的,即数据都是生产者提供的,消费者只能被动接受。虽然数据源不确定,但是对消费者是透明的,只能被动等待数据。由于 Observable 已经彻底移除了 Backpressure,因此对于消费速度和生产速度不协调时,中间操作符可能会创建 Buffer(如 observeOn)来缓存数据。因此数据在无限的积累中可能会导致 OOM, 但不再会抛出MissingBackpressureException

Flowable

在 Flowable 中,消费者通过Subscription主动的向生产者提出自己需求的数量,上游据此发射数据。从而就有生产和消费的矛盾。如果是数据源是响应式推或者异步拉时,可能会导致MissingBackpressureException

举例

这么说有点抽象,我们举个例子。Subject / Processor 就是对应异步拉的数据源,也是热数据源。消费者在订阅他们后,无论是否可以 request,数据的是否产生以及产生速度也是未知的。
在接受到onNext时:

  • Subject 直接转发给下游
  • Prosessor 检查下游的 request 数目,如果少于已经发送的数目,则抛出MissingBackpressureException

因此使用Prosessor稍有不慎就会出错哦。当然在实际使用中Flowable.subscribe()时,内置的 Subscriber 通常都会在 onSubscribe 时直接向生产者request(Long.MAX_VALUE),在 RxJava2 Long.MAX_VALUE 是一个特殊值,意味着无限流。大家可参见 subscribers

选择

对于 Observable 与 Flowable 的选择官方也有提示:

  • 选择 Observable:

    • 处理不超过 1000 个数据时,因为数据很少,一般不会触发 OOM
    • 处理 GUI 或者点击事件时,因为这些事件是异步推的,很难被 backpressured 也一般不这样做
    • 数据源本质上是同步的,但是平台不支持 Java Stream API 或者你想用 RxJava 丰富的操作符,因为 Observable 比 Flowable 的性能更好
  • 选择 Flowable:

    • 处理 10k+ 数据且数据源是可控的
    • 基于拉的且阻塞的数据源

拆分之争

事实上 RxJava 自 16年 开始社区一度有讨论是否要把 RxJava2 拆分成多个库,因为:

  • 区分是否支持 Backpressure
  • 将通用的代码独立出去,如 SchedulerSimpleQueue
  • 减少包体积的大小

但是最终经过讨论后还是放弃了:

  • 使用 Proguard 来压缩 RxJava2 的效果非常好,基本上是用多少保留多少代码
  • 如果拆分开,强行逼迫使用者需要了解 Backpressure 的概念,且增加了使用方的麻烦,因为他们需要区分自己到底需要哪些库
  • 多个不同版本子库的组合可能会导致兼容问题
  • 拆分后,Observable 和 Flowable 互不依赖,互转需要使用静态方法,打断了链式操作
  • ...

结语

如果您作为一个 Android 开发者,正在纠结于 RxJava 带来的好处和他的庞大的体积,那么您可以打消这个顾虑了,只要正确地配置了 Proguard,RxJava 对您包体积大小的影响微乎其微。反过来说,如果没有配置 Proguard,那么是否引入 RxJava 确实是值得思考的一件事。

事实上,做一个 Android 开发者,笔者认为 RxJava 简直是为 Android 而生的,天生响应式的事件与 RxJava 的结合能大幅提供您的工作效率。前提是您有一些函数式编程的思维,能把流程拆解成一个个操作符。

最后,欢迎关注笔者公众号,有问题欢迎后台留言: