RxJava用法(2)
Rx背压
问题:在异步情况中,被观察者发送事件的速率和观察者接收事件的速率不一样,会导致缓冲区溢&oom
对策:背压策略(back pressure strategy)————控制事件流速
原理:
- 反馈控制:被观察者根据观察者接收事件的能力发送事件
- 响应式拉取:根据观察者自身情况接收事件
- 缓冲区:对超出缓冲区的事件进行丢弃,覆盖,报错
具体使用:Flowable
在flowable用法中,被观察者变成了Flowable类,观察者变成了Subscriber类,其他用法和规则不变
- 响应式拉取(控制观察者)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45Flowable.create(new FlowableOnSubscribe<Integer>() {
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d("TAG", "发送事件 1");
emitter.onNext(1);
Log.d("TAG", "发送事件 2");
emitter.onNext(2);
Log.d("TAG", "发送事件 3");
emitter.onNext(3);
Log.d("TAG", "发送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
.subscribe(new Subscriber<Integer>() {
// 步骤2:创建观察者 = Subscriber & 建立订阅关系
public void onSubscribe(Subscription s) {
// 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription
// 相同点:Subscription参数具备Disposable参数的作用,
// 即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接
// 不同点:Subscription增加了void request(long n)
// 作用:决定观察者能够接收多少个事件
// 如设置了s.request(3),这就说明观察者能够接收3个事件(多出的事件存放在缓存区)
// 官方默认推荐使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);
Log.d("TAG", "onSubscribe");
s.request(3);
/**如果在异步的情况中request()没有参数,则认为观察者不接受事件
* 被观察者可以继续发送事件存到缓存区(缓存区大小=128)
* */
}
public void onNext(Integer integer) {
Log.d("TAG", "接收到了事件" + integer);
}
public void onError(Throwable t) {
Log.w("TAG", "onError: ", t);
}
public void onComplete() {
Log.d("TAG", "onComplete");
}
}); - 反馈控制(控制被观察者)
在反馈控制中,同步和异步是不同的,先介绍反馈控制的同步实现方法
- 反馈控制实现(同步)在被观察者发送的时候,我们拿到emitter.requested()的值,这个值和观察者s.request(10)设置的值相同,为10,观察者设置的能接收多少事件,被观察者就发送多少事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33Flowable.create(new FlowableOnSubscribe<Integer>() {
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 调用emitter.requested()获取当前观察者需要接收的事件数量
long n = emitter.requested();
Log.d(TAG, "观察者可接收事件" + n);
// 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件
for (int i = 0; i < n; i++) {
Log.d(TAG, "发送了事件" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 设置观察者每次能接受10个事件
s.request(10);
}
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
public void onComplete() {
Log.d(TAG, "onComplete");
}
}); - 反馈控制实现(异步)
在异步的情况下emitter.requested()的值和观察者s.request()的值不相同,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度
被观察者FlowableEmitter.requested()的返回值由RxJava内部决定,并且只会返回128,96,0三种情况
大概就是开始request(128),当缓冲区<=32的时候,request(96)
1 | // 被观察者:一共需要发送500个事件,但真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0 |
- 被观察者:一共需要发送500个事件,真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
- 观察者:每次接收事件数量 = 48(点击按钮)
- Flowable发送500个事件,开始RxJava内部会设置FlowableEmitter.requested()返回128,没有点击btn,所以Subscriber默认接收的数量为0,这128个事件被存放到缓冲区,Flowable还有(500-128=372)个事件
- 点击btn,Subscriber对象可以接收48个事件,此时缓存区是(128-48=80),Subscriber对象处理了48个对象,Flowable还是(500-128=372)个事件
- 再次点击,Subscriber可以接收(48+48=96)个事件,缓冲区(128-48-48=32), 此时缓冲区满足<=32条件,RxJava内部调用request(96),Flowable(500-128-96=276),缓存区(32+96=128),剩下的以此类推,就完整的完成了反馈控制的异步展示
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 泉子的理想乡!
评论