Rx背压

问题:在异步情况中,被观察者发送事件的速率和观察者接收事件的速率不一样,会导致缓冲区溢&oom
对策:背压策略(back pressure strategy)————控制事件流速
原理:

  • 反馈控制:被观察者根据观察者接收事件的能力发送事件
  • 响应式拉取:根据观察者自身情况接收事件
  • 缓冲区:对超出缓冲区的事件进行丢弃,覆盖,报错

具体使用:Flowable
在flowable用法中,被观察者变成了Flowable类,观察者变成了Subscriber类,其他用法和规则不变

  1. 响应式拉取(控制观察者)
    f2eed5fd467f168f42777e41ca67ed7e47713b9b03a12189.png
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    Log.d("TAG", "发送事件 1");
    emitter.onNext(1);
    Log.d("TAG", "发送事件 2");
    emitter.onNext(2);
    Log.d("TAG", "发送事件 3");
    emitter.onNext(3);
    Log.d("TAG", "发送完成");
    emitter.onComplete();
    }
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
    .observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
    .subscribe(new Subscriber<Integer>() {
    // 步骤2:创建观察者 = Subscriber & 建立订阅关系
    @Override
    public void onSubscribe(Subscription s) {
    // 对比Observer传入的Disposable参数,Subscriber此处传入的参数 = Subscription
    // 相同点:Subscription参数具备Disposable参数的作用,
    // 即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接
    // 不同点:Subscription增加了void request(long n)
    // 作用:决定观察者能够接收多少个事件
    // 如设置了s.request(3),这就说明观察者能够接收3个事件(多出的事件存放在缓存区)
    // 官方默认推荐使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);
    Log.d("TAG", "onSubscribe");
    s.request(3);
    /**如果在异步的情况中request()没有参数,则认为观察者不接受事件
    * 被观察者可以继续发送事件存到缓存区(缓存区大小=128)
    * */
    }
    @Override
    public void onNext(Integer integer) {
    Log.d("TAG", "接收到了事件" + integer);
    }
    @Override
    public void onError(Throwable t) {
    Log.w("TAG", "onError: ", t);
    }
    @Override
    public void onComplete() {
    Log.d("TAG", "onComplete");
    }
    });
  2. 反馈控制(控制被观察者)
    a190ac8a52060e3f5733f00af68ed91166383dd1210f204c.png
    在反馈控制中,同步和异步是不同的,先介绍反馈控制的同步实现方法
  • 反馈控制实现(同步)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    // 调用emitter.requested()获取当前观察者需要接收的事件数量
    long n = emitter.requested();
    Log.d(TAG, "观察者可接收事件" + n);
    // 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件
    for (int i = 0; i < n; i++) {
    Log.d(TAG, "发送了事件" + i);
    emitter.onNext(i);
    }
    }
    }, BackpressureStrategy.ERROR)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    // 设置观察者每次能接受10个事件
    s.request(10);
    }
    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "接收到了事件" + integer);
    }
    @Override
    public void onError(Throwable t) {
    Log.w(TAG, "onError: ", t);
    }
    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
    在被观察者发送的时候,我们拿到emitter.requested()的值,这个值和观察者s.request(10)设置的值相同,为10,观察者设置的能接收多少事件,被观察者就发送多少事件
  • 反馈控制实现(异步)
    在异步的情况下emitter.requested()的值和观察者s.request()的值不相同,即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度
    ca5d653b90f102cd659419bbceba05b7b060c19ea5bcd099.png
    被观察者FlowableEmitter.requested()的返回值由RxJava内部决定,并且只会返回128,96,0三种情况
    2a57fb9dbaa6cb693.png
    大概就是开始request(128),当缓冲区<=32的时候,request(96)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 被观察者:一共需要发送500个事件,但真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
// 观察者:每次接收事件数量 = 48(点击按钮)

Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());
boolean flag; //设置标记位控制
// 被观察者一共需要发送500个事件
for (int i = 0; i < 500; i++) {
flag = false;
// 若requested() == 0则不发送
while (emitter.requested() == 0) {
if (!flag) {
Log.d(TAG, "不再发送");
flag = true;
}
}
// requested() ≠ 0 才发送
Log.d(TAG, "发送了事件" + i + ",观察者可接收事件数量 = " + emitter.requested());
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行
.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 初始状态 = 不接收事件;通过点击按钮接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

// 点击按钮才会接收事件 = 48 / 次
btn = (Button) findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mSubscription.request(48);
// 点击按钮 则 接收48个事件
}

});
  • 被观察者:一共需要发送500个事件,真正开始发送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
  • 观察者:每次接收事件数量 = 48(点击按钮)
  1. Flowable发送500个事件,开始RxJava内部会设置FlowableEmitter.requested()返回128,没有点击btn,所以Subscriber默认接收的数量为0,这128个事件被存放到缓冲区,Flowable还有(500-128=372)个事件
  2. 点击btn,Subscriber对象可以接收48个事件,此时缓存区是(128-48=80),Subscriber对象处理了48个对象,Flowable还是(500-128=372)个事件
  3. 再次点击,Subscriber可以接收(48+48=96)个事件,缓冲区(128-48-48=32), 此时缓冲区满足<=32条件,RxJava内部调用request(96),Flowable(500-128-96=276),缓存区(32+96=128),剩下的以此类推,就完整的完成了反馈控制的异步展示
  • 缓冲区
    在创建Flowable的时候,会传入第二个参数,BackpresureStrategy.ERROR 直接传入参数即可
    da7eef3df0bedfd6d7c4c782b3cc617c1f2ab6a6c5523245.png
  • ERROR: 当缓冲区满了会抛出异常MissingBackpressureException
  • MISSING:当缓冲区满了会提示:QUEUE is full
  • BUFFER: 将缓存区大小设置成无限大(要注意内存,防止oom)
  • DROP:超过缓冲区的事件会被丢掉
  • LATEST: 只保留最后1个事件和第1到第128个事件,(一共129个)