• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RxJava从入门到精通RxJava源码初步

武飞扬头像
FishAnd_Yu
帮助1

Rxjava 源码学习(一):基本流程分析 - 知乎Rxjava 源码版本:Rxjava2.2.8 1. Rxjava 的基本实现首先看一下最简单的例子,具体查看其内部实现: 通过以下代码查看 Rxjava 的典型使用: Observable.create(new ObservableOnSubscribe<String>() { @Over…学新通https://zhuanlan.zhihu.com/p/307650066

概念共识

        1:Observable是被观察者,Observer是观察者,subsrcibe是让被观察者和观察者之间建立订阅关系。

      2:事件的发射自 上游到下游,事件的消费订阅:自下游到上游。

     3: 创建一个 ObservableCreate对象继承 Observable, 后面的发射事件,消费事件都是在这个对象中展开

1: Observable.create(new ObservableOnSubscrible())

     这也是Rxjava事件中的 :第一条线

     1. 仅仅是创建了 一个 Observerable 对象,这是一个抽象类并且实现 ObservableSource接口

public abstract class Observable<@NonNull T> implements ObservableSource<T>

      2. Observerable定义了生成各种操作符函数,比如 Map操作符函数

  1.  
    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
  2.  
    Objects.requireNonNull(mapper, "mapper is null");
  3.  
    return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
  4.  
    }

    3:定义了关键函数 subscribe()来让 Observable和Observer产生订阅关系

    4:定义 抽象函数 subscribeActual(Observer observer) ,订阅函数subscribe()最终会调用 实现类的 subscribeActual(Observer observer)函数

protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

2: 通过subscribe 来产生订阅关系。

     这也是Rxjava事件中的 :第二条线

  1: Observable 对象通过 subscribe 与 Observer 产生订阅关系

  2:subscribe 订阅关系是一条很重要的流程线,通过创建各种操作符下的 Observable对象,可以实现从:上游到下游的事件发射。此时创建订阅关系是由最后一个  Observable对象开始的,最后要给Observable来观察事件,所以事件订阅的流程就是:从下游到上游。

3:事件发生器(Emitter)

这也是Rxjava事件中的 :第三条线

1:  Observer、Observable 分别和事件发生器(Emitter) 产生关联,并且通过回调来到事件发射现场Observable。

2:  根据产生的订阅链自上游到下游发布事件。

事件流可以自上而下进行下去,原因是 Observable 操作符 得到的还是 Observable,通过通过 Observable.subsribe 方法实现订阅关系。

4:基本使用

学新通

5: 代码示例

  1.  
    Observable.create(new ObservableOnSubscribe<Object>() {
  2.  
    @Override
  3.  
    public void subscribe(@NonNull ObservableEmitter<Object> observer) throws Throwable {
  4.  
    Log.d(TAG, "步骤二:发射事件");
  5.  
    observer.onNext("步骤二发射事件");
  6.  
    }
  7.  
    }).subscribe(new Observer<Object>() {
  8.  
    @Override
  9.  
    public void onSubscribe(@NonNull Disposable d) {
  10.  
    Log.d(TAG, "步骤一:不改变Disposable布尔值让其消费事件");
  11.  
    }
  12.  
     
  13.  
    @Override
  14.  
    public void onNext(@NonNull Object o) {
  15.  
    Log.d(TAG, "步骤三消费:" o.toString());
  16.  
    }
  17.  
     
  18.  
    @Override
  19.  
    public void onError(@NonNull Throwable e) {
  20.  
     
  21.  
    }
  22.  
     
  23.  
    @Override
  24.  
    public void onComplete() {
  25.  
     
  26.  
    }
  27.  
    });
  28.  
     
  29.  
     
  30.  
    打印结果:
  31.  
    步骤一:不改变Disposable布尔值让其消费事件
  32.  
    步骤二:发射事件
  33.  
    步骤三消费:步骤二发射事件
学新通

6:源码分析:

1 :Observable.create(new ObservableOnSubscribe<Object> ()) 

实际上是:构造了 ObservableCreate对象,这个就是最初的发射事件对象

  1.  
    Observable.java
  2.  
     
  3.  
     
  4.  
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
  5.  
    Objects.requireNonNull(source, "source is null");
  6.  
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
  7.  
    }

2:  Observable.subscribe(new Observer()) 

利用其多态的性质,此时Observable的实现类是 :步骤一的 ObservableCreate对象。随意subscribe(new Observer()) 最终会调用 ObservableCreate.subscribeAcutal() 函数

  1.  
    Observable.java
  2.  
     
  3.  
    public final void subscribe(@NonNull Observer<? super T> observer) {
  4.  
    Objects.requireNonNull(observer, "observer is null");
  5.  
    try {
  6.  
     
  7.  
    observer = RxJavaPlugins.onSubscribe(this, observer);
  8.  
     
  9.  
    Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
  10.  
     
  11.  
    // 利用多态的性质,调用 ObservableCreate的 subscribeAcutal(observer)函数
  12.  
    subscribeActual(observer);
  13.  
    } catch (NullPointerException e) { // NOPMD
  14.  
    throw e;
  15.  
    } catch (Throwable e) {
  16.  
    Exceptions.throwIfFatal(e);
  17.  
    // can't call onError because no way to know if a Disposable has been set or not
  18.  
    // can't call onSubscribe because the call might have set a Subscription already
  19.  
    RxJavaPlugins.onError(e);
  20.  
     
  21.  
    NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
  22.  
    npe.initCause(e);
  23.  
    throw npe;
  24.  
    }
  25.  
    }
学新通

3: ObservableCreate 会先调用 observer.onScbscribe(createEmitter) 函数,然后回调

      source.subscribe(createEmitter) 函数,(  这里的source就是 ObservableOnSubscribe)

     通过createEmitter 来发射事件 

  1.  
    ObservableCreate.java
  2.  
    protected void subscribeActual(Observer<? super T> observer) {
  3.  
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
  4.  
     
  5.  
    // Observable发射事件后,先调用 Observer的 onSubscribe()函数,来决定后面是否消费事件
  6.  
    observer.onSubscribe(parent);
  7.  
     
  8.  
    try {
  9.  
     
  10.  
    // 通过source来发射事件
  11.  
    source.subscribe(parent);
  12.  
    } catch (Throwable ex) {
  13.  
    Exceptions.throwIfFatal(ex);
  14.  
    parent.onError(ex);
  15.  
    }
  16.  
    }
  17.  
     
  18.  
     
  19.  
    Observable.create(new ObservableOnSubscribe<Object>() {
  20.  
    @Override
  21.  
    public void subscribe(@NonNull ObservableEmitter<Object> observer) throws Throwable {
  22.  
    Log.d(TAG, "步骤二:发射事件");
  23.  
     
  24.  
    // 通过source来发射事件
  25.  
    observer.onNext("步骤二发射事件");
  26.  
    }
  27.  
    })
  28.  
     
  29.  
     
  30.  
     
  31.  
     
  32.  
    ObservableCreate.java
  33.  
     
  34.  
    static final class CreateEmitter<T>
  35.  
    extends AtomicReference<Disposable>
  36.  
    implements ObservableEmitter<T>, Disposable {
  37.  
     
  38.  
    private static final long serialVersionUID = -3434801548987643227L;
  39.  
     
  40.  
    final Observer<? super T> observer;
  41.  
     
  42.  
    CreateEmitter(Observer<? super T> observer) {
  43.  
    this.observer = observer;
  44.  
    }
  45.  
     
  46.  
    // 呼应 上面: 通过source来发射事件 流程
  47.  
    @Override
  48.  
    public void onNext(T t) {
  49.  
    if (t == null) {
  50.  
    onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
  51.  
    return;
  52.  
    }
  53.  
     
  54.  
    // 呼应上面 : 先调用 Observer的 onSubscribe()函数,来决定后面是否消费事件
  55.  
    if (!isDisposed()) {
  56.  
    observer.onNext(t);
  57.  
    }
  58.  
    }
  59.  
     
  60.  
     
  61.  
     
  62.  
     
学新通

4 :如果在 Observer的 onSubscribe(Disposable) 中没有解除订阅流程,那么就可以让 消费者Observer来消费事件了

     学新通 

 7:Schedulers: 指定线程

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfkegf
系列文章
更多 icon
同类精品
更多 icon
继续加载