• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

笔记Rxjava的使用和设计原理初探

武飞扬头像
_Mostly_Harmless
帮助2

一、Demo

1 依赖

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'

2 使用

创建Observable实例,subscribe传入观察者即可,在subscribe前也可以使用其他操作符做一些操作。

  1.  
    Function function = new Function<String, String>() {
  2.  
    @Override
  3.  
    public String apply(String s) throws Exception {
  4.  
    String result = "result" s;
  5.  
    return result;
  6.  
    }
  7.  
    };
  8.  
     
  9.  
    Observer observer = new Observer<String>() {
  10.  
    @Override
  11.  
    public void onSubscribe(Disposable d) {
  12.  
    }
  13.  
     
  14.  
    @Override
  15.  
    public void onNext(String string) {
  16.  
    }
  17.  
     
  18.  
    @Override
  19.  
    public void onError(Throwable e) {
  20.  
    }
  21.  
     
  22.  
    @Override
  23.  
    public void onComplete() {
  24.  
    }
  25.  
    };
  26.  
     
  27.  
    Observable.create(new ObservableOnSubscribe<String>() {
  28.  
    @Override
  29.  
    public void subscribe(ObservableEmitter<String> e) throws Exception {
  30.  
    e.onNext("First");
  31.  
    e.onNext("SEC");
  32.  
    e.onComplete();
  33.  
    }
  34.  
    })
  35.  
    .subscribeOn(Schedulers.io())
  36.  
    .observeOn(AndroidSchedulers.mainThread())
  37.  
    .map(function)
  38.  
    .subscribe(observer);
学新通

1 Observable.create创建一个Observable对象,入参ObservableOnSubscribe的subscribe会在链式调用的最终调用subscribe操作后触发,在这里可以使用ObservableEmitter执行发送数据等操作,最终会触发Observer观察者响应。

2 subscribeOn操作符是为subscribeOn前的操作设置线程调度器,在subscribeOn前的操作都会以它指定的线程调度器调度。

3 observeOn操作符是为此操作符后的操作设置线程调度器,与subscribeOn相对应。

4 map操作符是转换处理数据,map前传递下来的数据,经由map处理生成新的数据继续向下传递。

5 subscribe把一个observer注册订阅。

二、原理

以本demo为例:

        调用create,框架会创建一个ObservableCreate(Observable子类)对象,把ObservableOnSubscribe保存为成员source,并返回ObservableCreate对象。

  1.  
    Observable.create(new ObservableOnSubscribe<String>() {
  2.  
    @Override
  3.  
    public void subscribe(ObservableEmitter<String> e) throws Exception {
  4.  
    // 发送数据1
  5.  
    e.onNext("First");
  6.  
    // 发送数据2
  7.  
    e.onNext("SEC");
  8.  
    e.onComplete();
  9.  
    }
  10.  
    })

        subscribeOn给之前的操作分配IO调度器。本操作符会生成ObservableSubscribeOn(Observable子类)对象,把上一操作符生成对象以自己的成员source保存并返回ObservableSubscribeOn对象。本次操作的调用者是上一个操作符生成的ObservableCreate对象。

.subscribeOn(Schedulers.io())

       observeOn给后续操作切换为主线程。同上,本次会生成ObservableObserveOn(Observable子类)对象并把上一操作符生成对象保存为自己的成员source。

.observeOn(AndroidSchedulers.mainThread())

        map操作,将上文数据在此处加工处理,处理后数据再传递给下文使用。同样的,本次生成ObservableMap(Observable子类)对象并包装上一层的返回对象为成员source。

  1.  
    // <源数据类型, 目标数据类型>
  2.  
    Function function = new Function<String, String>() {
  3.  
    @Override
  4.  
    public String apply(String s) throws Exception {
  5.  
    // s:源数据
  6.  
    String result = "result" s;
  7.  
    // return 将处理后的数据result向下传递
  8.  
    return result;
  9.  
    }
  10.  
    };

         subscribe订阅,把Observer注册进来,订阅Observable信息。(之前的操作都仅仅是创建了Observable对象;另:之前的操作符生成的对象都是Observable实现子类,本质都是Observable对象。)

  1.  
    Observer observer = new Observer<String>() {
  2.  
    @Override
  3.  
    public void onSubscribe(Disposable d) {
  4.  
    }
  5.  
     
  6.  
    @Override
  7.  
    public void onNext(String string) {
  8.  
    }
  9.  
     
  10.  
    @Override
  11.  
    public void onError(Throwable e) {
  12.  
    }
  13.  
     
  14.  
    @Override
  15.  
    public void onComplete() {
  16.  
    }
  17.  
    };
  18.  
     
  19.  
    ...
  20.  
    .subscribe(observer);
学新通

        至此,subscribe之前的最后一个操作符map(在本例中是map)返回了一个最终的ObservableMap对象,它层层包裹了自下而上的每个操作符生成的Observable他们各自保存了各自上层的Observable为source成员。最后调用了.subscribe(observer)把观察者注册进来。

        subscibe是Observable的方法,他调用自己的实现子类的subscribeActual做具体操作,因此当ObservableMap.subscibe时实际调用的是ObservableMap的subscribeActual,其关键实现是source.subscribe(其他的Observable.subscribe正常情况也都是调用自己的source.subscribe),既调用自己包裹的上一层操作符生成的Observable的subscribe,并生成一个Observer对象作为方法参数传入进去。

        上一层调用生成的Observable调用subscribe又会调用自己包裹的自己的上层生成的Observable的subscribe,由此触发连锁反应一直拆包调用下去,最终实际效果就是层层剥开包裹,按照操作符的顺序自下而上一直调用到最顶端生成的Observer的subscribe,并且每层调用subscribe都会把方法入参Observer重新包裹一层自己的Observer并继续通过subscribe传参下去。

        这样,最后一个Observable也就是顶端操作符生成的Observable的subscribe方法接收到的参数就又是一个层层包裹着的Observer(本例中传到最顶端ObservableOnSubscribe最外一层是一个发射器CreateEmitter)。调用到顶层subscribe的实现,可以使用CreateEmitter发射器做相关操作(如onNext、onComplite等),实际就是一层层拆包Observer并调用他们的对应的操作(如onNext、onComplite等)的过程。最终传递到最底层的实际的Observer订阅者。

图:

学新通

本例中的两个问题

1 subscribeOn和observeOn是怎么实现分别给上层下层调用分配不同的线程的。

    subscribeOn是给上层分配线程调度器,subscribe调用是自下而上拆包执行的,当执行到subscribeOn的包时,他就分配新线程调用剩下的subscribe了,因此自subscribeOn起再往上就变成了切换后的线程。

    而observeOn在自下而上调用subscribe时并没有把线程启动,而是把调度任务封装到了Observer参数中传递上去了,因此上方的调用不会受他的影响而切换线程;而当subscribe传递到最上层发送任务时,开始自上而下层层拆包Observer执行任务时,调度器就触发了,因此observeOn向下的操作就被observeOn指定的调度器改变了线程了。

2 map的Function是合适被触发的。

    map操作有一个Function apply回调,他是由ObservableMap的onNext调用的,因此,他也是顶层发送onNext消息自上而下拆包到map时触发的。

三、总结:

1 每个节点的调用都是创建一个自己对应的Observable,把上一个节点的Observable保存为自己的source成员变量,在调用subscribe之前一直层层包裹。

2 最后一个节点调用subscribe就是拆开包裹调用自己的source.subscribe,并把自己的observer传参进去,由此触发一层层拆包调用自己的source.subscribe,每次拆包都创建一个自己的observer把入参的observer包进去,再传参下去,最终拆到最顶层,并传递给了顶层一个层层包裹的observer发射器。

3 顶层subscribe是提供给发布者(被观察者)发布消息的,在这里通过observer包裹调用onNext onComplete等,实际是拆包observer执行他们各自的onNext onComplete等,实际达到各自的目的,最终调用到包裹最里层就是注册的observer。

        总而言之:rxjava的链式调用是一个从上而下打包Observable的过程,最末端调用subscribe注册observer是一个把Observable拆包并层层打包Observer向上传递的过程,拆包到最顶层,顶层Observable发布数据就又是一个通过拆包Observer传递数据给最底层注册的observer使用最终数据的过程。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcccia
系列文章
更多 icon
同类精品
更多 icon
继续加载