热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:深入讲解RxJava响应式编程框架,背压问题的几种应对模式

篇首语:本文由编程笔记#小编为大家整理,主要介绍了深入讲解RxJava响应式编程框架,背压问题的几种应对模式相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了深入讲解RxJava响应式编程框架,背压问题的几种应对模式相关的知识,希望对你有一定的参考价值。



背压

本节首先介绍什么是背压(Backpressure)问题,然后介绍背压问题的几种应对模式。




 什么是背压问题

当上下游的流操作处于不同的线程时,如果上游弹射数据的速度快于下游接收处理数据的速度,对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,又不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。

一个存在背压问题的演示实例代码如下:

package com.crazymaker.demo.rxJava.basic;
//省略import
@Slf4j
public class BackpressureDemo {
/**
*演示不使用背压
*/

@Test
public void testNoBackpressure() throws InterruptedException {
//被观察者(主题)
Observable observable = Observable.create(
new Observable.OnSubscribe() {
@Override
public void call(Subscribersuper String> subscriber) {
//循环10次
for (int i = 0;i<10 ; i++) {
log.info("produce ->" + i);
subscriber.onNext(String.valueOf(i));
}
}
});
//观察者
Action1 subscriber = new Action1() {
public void call(String s){
try {
//每消费一次间隔50毫秒
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("consumer ->" + s);
}
};
//订阅:observable与subscriber之间依然通过subscribe()进行关联
observable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(subscriber);
Thread.sleep(Integer.MAX_VALUE);
}
}

在实例代码中,observable发射操作执行在一条通过Schedulers.io()调度器获取的IO线程上,而观察者subscriber的消费操作执行在另一条通过Schedulers.newThread()调度器获取的新线程上。observable流不断发送数据,累积发送10次;观察者subscriber每隔50毫秒接收一条数据。

运行上面的演示程序后,输出的结果如下:

17:56:17.719 [Rxioscheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->0
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->1
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->2
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->3
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->4
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->5
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->6
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->7
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->8
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->9
17:56:17.774 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->0
17:56:17.824 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->1
17:56:17.875 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->2
17:56:17.925 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->3
17:56:17.976 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->4
17:56:18.027 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->5
17:56:18.078 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->6
17:56:18.129 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7
17:56:18.179 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->8
17:56:18.230 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->9

上面的程序有一个特点:生产者observable弹射数据的速度大于下游消费者subscriber接收处理数据的速度,但是由于数据量小,因此上面的程序运行起来没有出现问题。

简单修改一下生产者,将原来的弹射10条改成无限制地弹射,代码如下:

//被观察者(主题)
Observable observable = Observable.create(
new Observable.OnSubscribe() {
@Override
public void call(Subscribersuper String> subscriber) {
//无限制地循环
for (int i = 0; ; i++) {
//log.info("produce ->" + i); subscriber.onNext(String.valueOf(i));
}
}
});

再次运行该演示程序后,抛出的异常如下:

Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext
(OperatorObserveOn.java:160)
at rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.onNext
(OperatorSubscribeOn.java:74)
at com.crazymaker.demo.rxJava.basic.BackpressureDemo$1.call
(BackpressureDemo.java:24)
at com.crazymaker.demo.rxJava.basic.BackpressureDemo$1.call
(BackpressureDemo.java:19)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.call
(OperatorSubscribeOn.java:100)
at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call
(CachedThreadScheduler.java:230)
... 9 more

异常原因:由于上游observable流弹射数据的速度远远大于下游通过subscriber接收的速度,导致observable用于暂存弹射数据的队列空间耗尽,造成上游数据积压。


 背压问题的几种应对模式

如何应对背压问题呢?在创建主题时可以使用Observable类的一个重载的create方法设置具体的背压模式,该方法的源代码如下:

public static Observable create(Action1> emitter, Emitter.BackpressureMode backpressure) {
return unsafeCreate(new OnSubscribeCreate(emitter, backpressure));
}

此方法的第二个参数用于指定一种背压模式。背压模式有多种,比较常用的有“最近模式”
Emitter.BackpressureMode.LATEST。这种模式的含义为:如果消费跟不上,那么仅仅缓存最近弹射出来的数据,将老旧一点的数据直接丢弃。

使用“最近模式”背压,改写4.8.1节的测试用例,代码如下:

/**
*演示使用“最近模式”背压
*/

@Test
public void testBackpressure() throws InterruptedException {
//主题实例,使用背压
Observable observable = Observable.create(
new Action1String>> () {
@Override
public void call(Emitter<String> emitter) {
//无限循环
for (int i = 0; ; i++) {
//log.info("produce ->" + i);
emitter.onNext(String.valueOf(i));
}
}
}, Emitter.BackpressureMode.LATEST);
//订阅者(观察者)
Action1<String> subscriber = new Action1<String>() {
public void call(String s) {
try {
//每消费一次间隔50毫秒
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("consumer ->" + s);
}
};
//订阅: observable与subscriber之间依然通过subscribe()进行关联
observable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(subscriber);
Thread.sleep(Integer.MAX_VALUE);
}

运行这个演示程序,部分输出的结果节选如下:

18:51:54.736 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->0
18:51:54.745 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->1
//省略部分输出
18:51:55.217 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->123
18:51:55.220 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->124
18:51:55.224 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->125
18:51:55.228 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->126
18:51:55.232 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->127
18:51:55.236 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337652
18:51:55.240 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337653
18:51:55.244 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337654
//省略部分输出
18:51:55.595 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337747
18:51:55.598 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->14161628

从输出的结果可以看到,上游主题连续不断地弹射,下游订阅者在接收完127后直接跳到了7337652,其间弹射出来的几百万数据(相对旧一点的数据)就直接被丢弃了。

除了
Emitter.BackpressureMode.LATEST“最近模式”外,RxJava在Emitter接口中通过一个枚举常量定义了以下几种背压模式:

enum BackpressureMode {
/**
*No backpressure is applied(无背压模式)
*可能导致rx.exceptions.MissingBackpressureException异常
*或者IllegalStateException异常
*/

NONE,
/**
*如果消费者跟不上,就抛出rx.exceptions.MissingBackpressureException异常
*/

ERROR,
/**
*缓存所有的onNext方法弹射出来的消息,等待消费者慢慢地消费
*/

BUFFER,
/**
*如果下游消费跟不上,就丢弃onNext方法弹射出来的新消息
*/

DROP,
/**
*如果消费者跟不上,就丢掉旧的消息,缓存onNext方法弹射出来的新消息
*/

LATEST
}

对于以上RxJava背压模式,介绍如下:

(1)BackpressureMode.DROP:在这种模式下,Observable主题使用固定大小为128的缓冲区。如果下游订阅者无法处理,流的第一个元素就会缓存下来,后续的会被丢弃。

(2)BackpressureMode.LATEST:这种模式与BackpressureMode.DROP类似,并且Observable主题也使用固定大小为128的缓冲区。BackpressureMode.LATEST的缓存策略不同,使用最新的弹出元素替换缓冲区缓存的元素。当消费者可以处理下一个元素时,它收到的是Observable最近一次弹出的元素。

(3)BackpressureMode.NONE和BackpressureMode.ERROR:在这两种模式中发送的数据不使用背压。如果上游observable主题弹射数据的速度大于下游通过subscriber接收的速度,造成上游数据积压,就会抛出
MissingBackpressureException异常。

(4)BackpressureMode.BUFFER:在这种模式下,有一个无限的缓冲区(初始化时是128),下游消费不了的元素全部会放到缓冲区中。如果缓冲区中持续地积累,就会导致内存耗尽,抛出OutOfMemoryException异常。









  • 深入讲解RxJava响应式编程框架,背压问题的几种应对模式




  • 关注我,一个勤奋爱劳动的程序员












    关注作者微信公众号 —《Java架构师联盟》


    了解更多Java后端架构、python知识以及最新面试宝典












推荐阅读
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Android源码深入理解JNI技术的概述和应用
    本文介绍了Android源码中的JNI技术,包括概述和应用。JNI是Java Native Interface的缩写,是一种技术,可以实现Java程序调用Native语言写的函数,以及Native程序调用Java层的函数。在Android平台上,JNI充当了连接Java世界和Native世界的桥梁。本文通过分析Android源码中的相关文件和位置,深入探讨了JNI技术在Android开发中的重要性和应用场景。 ... [详细]
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • C语言注释工具及快捷键,删除C语言注释工具的实现思路
    本文介绍了C语言中注释的两种方式以及注释的作用,提供了删除C语言注释的工具实现思路,并分享了C语言中注释的快捷键操作方法。 ... [详细]
  • 本文介绍了一种划分和计数油田地块的方法。根据给定的条件,通过遍历和DFS算法,将符合条件的地块标记为不符合条件的地块,并进行计数。同时,还介绍了如何判断点是否在给定范围内的方法。 ... [详细]
  • 本文介绍了多因子选股模型在实际中的构建步骤,包括风险源分析、因子筛选和体系构建,并进行了模拟实证回测。在风险源分析中,从宏观、行业、公司和特殊因素四个角度分析了影响资产价格的因素。具体包括宏观经济运行和宏经济政策对证券市场的影响,以及行业类型、行业生命周期和行业政策对股票价格的影响。 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • 本文介绍了解决二叉树层序创建问题的方法。通过使用队列结构体和二叉树结构体,实现了入队和出队操作,并提供了判断队列是否为空的函数。详细介绍了解决该问题的步骤和流程。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
author-avatar
nilue1_203
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有