热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度

篇首语:本文由编程笔记#小编为大家整理,主要介绍了浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度相关的知识,希望对你有一定的参考价

篇首语:本文由编程笔记#小编为大家整理,主要介绍了浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度相关的知识,希望对你有一定的参考价值。


一、创建操作:

1、观察者模式:
RxJava的世界里,我们有四种角色:

    Observable(被观察者)、Observer(观察者)
    Subscriber(订阅者)、Subject
    Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2、回调方法:
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法:




    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。






    • onError(Exception ex)
      当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。






    • onComplete
      正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。












3、添加依赖:


  compile ‘io.reactivex:rxandroid:1.1.0‘

  compile ‘io.reactivex:rxjava:1.1.0‘

4、create():

Observable.create(new Observable.OnSubscribe() {


@Override
public void call(Subscriber subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
}).subscribe(new Observer() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "OnNext=" + s);
}
});
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,只是多了onStart()方法,作为异步调用之前的操作:


.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "OnNext=" + o);
}
});
*****************************************************源码解析订阅**************************************

public final Subscription subscribe(Subscriber subscriber) {
return Observable.subscribe(subscriber, this);
}
private static Subscription subscribe(Subscriber subscriber, Observable observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.OnSubscribe== null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}

// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));//捕获异常并回调onError()
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}

*****************************************************源码解析订阅**************************************

5、from():

String[] arrays={"Hello","RxJava"};
Observable.from(arrays)
.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "OnNext=" + o);
}
});

*****************************************************源码解析订阅**************************************

public final class{
  ...
  //此处create()就不多做解释了,...call()...
  public final static Observable from(Iterable iterable) {        
    return create(new OnSubscribeFromIterable(iterable));    
  }
  ...
}

public final class OnSubscribeFromIterable implements OnSubscribe {
final Iterable is;
public OnSubscribeFromIterable(Iterable iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
@Override
public void call(final Subscriber o) {
final Iterator it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer(o, it));//未执行完继续迭代
}
private static final class IterableProducer extends AtomicLong implements Producer {
/** */
private static final long serialVersiOnUID= -8730475647105475802L;
private final Subscriber o;
private final Iterator it;
private IterableProducer(Subscriber o, Iterator it) {
this.o = o;
this.it = it;
}
@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastpath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowpath(n);
}
}
void slowpath(long n) {
// backpressure is requested
final Subscriber o = this.o;
final Iterator it = this.it;
long r = n;
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long numToEmit = r;
while (true) {if (o.isUnsubscribed()) { return;} else if (it.hasNext()) { if (--numToEmit >= 0) { o.onNext(it.next()); } else break;} else if (!o.isUnsubscribed()) { o.onCompleted(); return;} else { // is unsubscribed return;}
}
r = addAndGet(-r);
if (r == 0L) {// we‘re done emitting the number requested so// returnreturn;
}
}
}
void fastpath() {
// fast-path without backpressure
final Subscriber o = this.o;
final Iterator it = this.it;
while (true) {
if (o.isUnsubscribed()) {return;
} else if (it.hasNext()) {o.onNext(it.next());
} else if (!o.isUnsubscribed()) {o.onCompleted();return;
} else {// is unsubscribedreturn;
}
}
}
}
}

*****************************************************源码解析订阅**************************************

6、just():


Observable.just("Hello","RxJava")

  .subscribe(new Observer() {

    @Override

    public void onCompleted() {

      Log.i("wxl", "onCompleted");

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onNext(String s) {

      Log.i("wxl", "OnNext=" + s);

    }

  });

*****************************************************源码解析订阅**************************************

public final static Observable just(T t1, T t2) { //然而源码解析完之后你会觉得 同上from()
return from(Arrays.asList(t1, t2));
}

*****************************************************源码解析订阅**************************************

二、变换操作:(重点:当然还是要先创建被观察者)
1、Map(): (多输入,单输出
的概念,用代理模式去理解map()方法执行过程,简单说就是Observable和OnSubscribe被新的取代了)


Observable.just("Hello", "RxJava")

  .map(new Func1() {

    @Override

    public String call(String s) {

      return s.toUpperCase();

    }

  }).subscribe(new Subscriber() {

    @Override

    public void onCompleted() {

      Log.i("wxl", "onCompleted");

    }

 

    @Override

    public void onError(Throwable e) {

 

    }

 

    @Override

    public void onNext(String s) {

      Log.i("wxl", "OnNext=" + s);

    }

});

*****************************************************源码解析订阅**************************************

public class Observable {
...
  //订阅,跟上面一样
private static Subscription subscribe(Subscriber subscriber, Observable observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.OnSubscribe== null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//发射OnSubscrible中的call();注意,此时已替换了,用代理思维去理解。
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
public final Observable lift(final Operator operator) {
return new Observable(new OnSubscribe() { //重新创建了新的Observable和OnSubscribe
@Override
public void call(Subscriber o) {
try {Subscriber st = hook.onLift(operator).call(o);//回调替换的部分逻辑try { // new Subscriber created and being subscribed with so ‘onStart‘ it st.onStart(); onSubscribe.call(st);//发射新的} catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e);}
} catch (Throwable e) {Exceptions.throwIfFatal(e);// if the lift function failed all we can do is pass the error to the final Subscriber// as we don‘t have the operator available to uso.onError(e);
}
}
});
}
...
}

*****************************************************源码解析订阅**************************************

2、flatMap():


同上差不多,可以看做是扁平化的一种map(二次转换)

 比较flatMap与map:

假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
  @Override
  public void onNext(Student student) {
    List courses = student.getCourses();
    for (int i = 0; i       Course course = courses.get(i);
   Log.d(tag, course.getName());
   }
  }
...
};
Observable.from(students)
  .subscribe(subscriber);
而flatMap:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
  @Override
  public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
  .flatMap(new Func1>() {
    @Override
    public Observable call(Student student) {
  return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
3、Filter():

Observable.just(4, 2, 1, 7, 5)
.filter(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
})
.subscribe(new Observer() {
@Override
public void onCompleted() {
Log.i("wxl", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("wxl", "OnNext=" + integer);
}
});

***************************************************源码解析****************************************************************

public final Observable filter(Func1 predicate) {
return lift(new OperatorFilter(predicate));
}

public final class OperatorFilter implements Operator {
private final Func1 predicate;
public OperatorFilter(Func1 predicate) {
this.predicate = predicate;
}
@Override
public Subscriber call(final Subscriber child) {
return new Subscriber(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
try {if (predicate.call(t)) {//判断 child.onNext(t);} else { // TODO consider a more complicated version that batches these request(1);}
} catch (Throwable e) {Exceptions.throwOrReport(e, child, t);
}
}
};
}
}

***************************************************源码解析****************************************************************

4、线程调度:

.subscribeOn(Schedulers.io()):

***************************************************源码解析****************************************************************

public final Observable subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn(scheduler));
}

 

public final class Schedulers {
  //
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final Schedulers INSTANCE = new Schedulers();
   ....
}

public class OperatorSubscribeOn implements Operator> {
private final Scheduler scheduler;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber> call(final Subscriber subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber>(subscriber) {
@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable o) {
inner.schedule(new Action0() {//在选择的线程里做处理
@Overridepublic void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber(subscriber) {
@Override public void onCompleted() { subscriber.onCompleted(); }
@Override public void onError(Throwable e) { subscriber.onError(e); }
@Override public void onNext(T t) { subscriber.onNext(t); }
@Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() {
@Override public void request(final long n) { if (Thread.currentThread() == t) { // don‘t schedule if we‘re already on the thread (primarily for first setProducer call) // see unit test ‘testSetProducerSynchronousRequest‘ for more context on this producer.request(n); } else { inner.schedule(new Action0() {
@Override public void call() { producer.request(n); } }); } }
}); }
});}
});
}
};
}
}

***************************************************源码解析****************************************************************

.observeOn(AndroidSchedulers.mainThread()):
参考
OperatorObserveOn中源码;
基本思路:最好用代理思维去理解

(被观察者)Observable:(订阅者)OnSubscribe extends Action1 extends Action extends Function

——> 订阅 subscribe(Observer) 回调处理

(观察者)Observer:Subscriber implements Observer

 应用场景还没有,希望多交流多指正!

 








































































































































































































































































































































































































































































































































推荐阅读
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Google Play推出全新的应用内评价API,帮助开发者获取更多优质用户反馈。用户每天在Google Play上发表数百万条评论,这有助于开发者了解用户喜好和改进需求。开发者可以选择在适当的时间请求用户撰写评论,以获得全面而有用的反馈。全新应用内评价功能让用户无需返回应用详情页面即可发表评论,提升用户体验。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
  • Final关键字的含义及用法详解
    本文详细介绍了Java中final关键字的含义和用法。final关键字可以修饰非抽象类、非抽象类成员方法和变量。final类不能被继承,final类中的方法默认是final的。final方法不能被子类的方法覆盖,但可以被继承。final成员变量表示常量,只能被赋值一次,赋值后值不再改变。文章还讨论了final类和final方法的应用场景,以及使用final方法的两个原因:锁定方法防止修改和提高执行效率。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • React基础篇一 - JSX语法扩展与使用
    本文介绍了React基础篇一中的JSX语法扩展与使用。JSX是一种JavaScript的语法扩展,用于描述React中的用户界面。文章详细介绍了在JSX中使用表达式的方法,并给出了一个示例代码。最后,提到了JSX在编译后会被转化为普通的JavaScript对象。 ... [详细]
  • 恶意软件分析的最佳编程语言及其应用
    本文介绍了学习恶意软件分析和逆向工程领域时最适合的编程语言,并重点讨论了Python的优点。Python是一种解释型、多用途的语言,具有可读性高、可快速开发、易于学习的特点。作者分享了在本地恶意软件分析中使用Python的经验,包括快速复制恶意软件组件以更好地理解其工作。此外,作者还提到了Python的跨平台优势,使得在不同操作系统上运行代码变得更加方便。 ... [详细]
  • 【Windows】实现微信双开或多开的方法及步骤详解
    本文介绍了在Windows系统下实现微信双开或多开的方法,通过安装微信电脑版、复制微信程序启动路径、修改文本文件为bat文件等步骤,实现同时登录两个或多个微信的效果。相比于使用虚拟机的方法,本方法更简单易行,适用于任何电脑,并且不会消耗过多系统资源。详细步骤和原理解释请参考本文内容。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
author-avatar
卟抛棄D
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有