rxjava是一个异步框架,功能和handler类似,特点是链式调用,逻辑简单。

内容

  • 观察者模式
  • rxjava异步使用
  • 操作符介绍
  • rxjava背压

观察者模式

java中的观察者模式,主要有三个关键词需要记住,被观察者(Observable),订阅(subscribe),观察者(Observer)。
核心思想:被观察者和观察者通过订阅产生一种关系,当被观察者发生一些改变,通知观察者,观察者对应做出相应的回应。
举例:小说是被观察者,读者是观察者,小说和读者之前通过subscribe产生订阅关系,小说更新了,通知读者去买新小说。

Rx异步使用

创建被观察者(Observable)

1
2
3
4
5
6
7
8
9
10
Observable<String> story = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
for (int i=1;i<4;i++){
Log.d("TAG","我是小说,我更新了第"+i+"季");
emitter.onNext(i+"");
}
emitter.onComplete();
}
});

调用Observable的create(),传入ObservableOnSubscribe对象,重写ObservableOnSubscribe对象的subscribe(),在subscribe()中,有一个ObservableEmitter对象,这是一个发射器,调用发射器的onNext(),把被观察者(Observable)的事件发送出去。

创建观察者(Observer)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observer<String> reader = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG","我是读者,我和小说订阅了,小说变动后会通知我");
}
@Override
public void onNext(String value) {
Log.d("TAG","我是读者,我拿到了小说的新版本:"+value+"版本");
}
@Override
public void onError(Throwable e) {
Log.d("TAG","我是读者,拿小说的时候出现了问题,后面的不拿了");
}
@Override
public void onComplete() {
Log.d("TAG","我是读者,小说的新版本被我拿完了");
}
};

创建Observer,直接new一个Observer重写他的四个方法:

  • onSubscribe():当Observer和Observable订阅的时候调用;
  • onNext():对Observable中的emitter.onNext()发射出来的事件进行处理;
  • onError():不用多说,坏了;
  • onComplete():Observable发送来的事件全部处理完成,结束调用;

注意onError()和onComplete()是互斥的,只会调用一个

订阅

1
2
3
story.observeOn(AndroidSchedulers.mainThread());
story.subscribeOn(Schedulers.io());
story.subscribe(reader);

异步实现: subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。

  • story.subscribeOn(Schedulers.io());发射事件线程是io线程
  • story.observeOn(AndroidSchedulers.mainThread());处理事件线程是main

本来应该是用户订阅小说,但RxJava为了保证流式API调用风格,改为了小说订阅用户,可以理解为小说记录了一个用户,更新时小说通知用户。

运行结果:

1
2
3
4
5
6
7
8
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是读者,我和小说订阅了,小说变动后会通知我
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是小说,我更新了第1
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是读者,我拿到了小说的新版本:1版本
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是小说,我更新了第2
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是读者,我拿到了小说的新版本:2版本
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是小说,我更新了第3
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是读者,我拿到了小说的新版本:3版本
2022-03-02 16:57:22.636 6289-6289/vip.izumi.androidframework D/TAG: 我是读者,小说的新版本被我拿完了

上面的三个步骤可以合起来用链式调用的方法写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
for (int i=1;i<4;i++){
Log.d("TAG","我是小说,我更新了第"+i+"季");
emitter.onNext(i+"");
}
emitter.onComplete();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("TAG","我是读者,我和小说订阅了,小说变动后会通知我");
}
@Override
public void onNext(String value) {
Log.d("TAG","我是读者,我拿到了小说的新版本:"+value+"版本");
}
@Override
public void onError(Throwable e) {
Log.d("TAG","我是读者,拿小说的时候出现了问题,后面的不拿了");
}
@Override
public void onComplete() {
Log.d("TAG","我是读者,小说的新版本被我拿完了");
}
});

效果和上面一模一样,这种写法逻辑上更加清晰一点。

rxjava操作符使用

  • 创建操作符
  • 变换操作符
  • 合并操作符
  • 功能操作符

创建操作符

13b3c1cc88f738b6d.png
作用:创建Observable,发送事件

  • just()
    1
    Observable.just("1","2","3","4");
  • fromArray()
    1
    2
    Integer[] numbers = {1,2,3,4};
    Observable.fromArray(numbers);
  • fromIterable()
    1
    2
    3
    4
    5
    6
    ArrayList<String> arrayList = new ArrayList<>();
    arrayList.add("1");
    arrayList.add("2");
    arrayList.add("3");
    arrayList.add("4");
    Observable.fromIterable(arrayList);
  • never():不发送任何事件
  • empty():只发送Complete事件,即emitter.complete()
  • error():发送一个异常,传入error()中

延时创建:定时操作&周期性操作

  • defer():直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
  • timer(): 延迟指定时间后,发送1个数值0,默认是在新线程中执行
    1
    Observable.timer(2, TimeUnit.SECONDS) 
    本质 = 延迟指定时间后,调用一次 onNext(0)
  • interval():
    1
    2
    3
    4
    // 参数1 = 第1次延迟时间;
    // 参数2 = 间隔时间数字;
    // 参数3 = 时间单位;
    Observable.interval(3,1,TimeUnit.SECONDS)
    24ad6a50cc187ca69.png

变换操作符

d5e62334a18c9c4029f3f90b239c03e88eca85955f20d89d.png

合并操作符

d07c3a6e04ed2dc6dcc7a986dd8cf8e6dcbf8522e7da3ad3.png

功能操作符

7ec65b43d72c29c1cbcd1b1dcc12ec7e4cbd034d15fb0aa5.png