笔记Rxjava的使用和设计原理初探
一、Demo
1 依赖
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
2 使用
创建Observable实例,subscribe传入观察者即可,在subscribe前也可以使用其他操作符做一些操作。
-
Function function = new Function<String, String>() {
-
-
public String apply(String s) throws Exception {
-
String result = "result" s;
-
return result;
-
}
-
};
-
-
Observer observer = new Observer<String>() {
-
-
public void onSubscribe(Disposable d) {
-
}
-
-
-
public void onNext(String string) {
-
}
-
-
-
public void onError(Throwable e) {
-
}
-
-
-
public void onComplete() {
-
}
-
};
-
-
Observable.create(new ObservableOnSubscribe<String>() {
-
-
public void subscribe(ObservableEmitter<String> e) throws Exception {
-
e.onNext("First");
-
e.onNext("SEC");
-
e.onComplete();
-
}
-
})
-
.subscribeOn(Schedulers.io())
-
.observeOn(AndroidSchedulers.mainThread())
-
.map(function)
-
.subscribe(observer);
1 Observable.create创建一个Observable对象,入参ObservableOnSubscribe的subscribe会在链式调用的最终调用subscribe操作后触发,在这里可以使用ObservableEmitter执行发送数据等操作,最终会触发Observer观察者响应。
2 subscribeOn操作符是为subscribeOn前的操作设置线程调度器,在subscribeOn前的操作都会以它指定的线程调度器调度。
3 observeOn操作符是为此操作符后的操作设置线程调度器,与subscribeOn相对应。
4 map操作符是转换处理数据,map前传递下来的数据,经由map处理生成新的数据继续向下传递。
5 subscribe把一个observer注册订阅。
二、原理
以本demo为例:
调用create,框架会创建一个ObservableCreate(Observable子类)对象,把ObservableOnSubscribe保存为成员source,并返回ObservableCreate对象。
-
Observable.create(new ObservableOnSubscribe<String>() {
-
-
public void subscribe(ObservableEmitter<String> e) throws Exception {
-
// 发送数据1
-
e.onNext("First");
-
// 发送数据2
-
e.onNext("SEC");
-
e.onComplete();
-
}
-
})
subscribeOn给之前的操作分配IO调度器。本操作符会生成ObservableSubscribeOn(Observable子类)对象,把上一操作符生成对象以自己的成员source保存并返回ObservableSubscribeOn对象。本次操作的调用者是上一个操作符生成的ObservableCreate对象。
.subscribeOn(Schedulers.io())
observeOn给后续操作切换为主线程。同上,本次会生成ObservableObserveOn(Observable子类)对象并把上一操作符生成对象保存为自己的成员source。
.observeOn(AndroidSchedulers.mainThread())
map操作,将上文数据在此处加工处理,处理后数据再传递给下文使用。同样的,本次生成ObservableMap(Observable子类)对象并包装上一层的返回对象为成员source。
-
// <源数据类型, 目标数据类型>
-
Function function = new Function<String, String>() {
-
-
public String apply(String s) throws Exception {
-
// s:源数据
-
String result = "result" s;
-
// return 将处理后的数据result向下传递
-
return result;
-
}
-
};
subscribe订阅,把Observer注册进来,订阅Observable信息。(之前的操作都仅仅是创建了Observable对象;另:之前的操作符生成的对象都是Observable实现子类,本质都是Observable对象。)
-
Observer observer = new Observer<String>() {
-
-
public void onSubscribe(Disposable d) {
-
}
-
-
-
public void onNext(String string) {
-
}
-
-
-
public void onError(Throwable e) {
-
}
-
-
-
public void onComplete() {
-
}
-
};
-
-
...
-
.subscribe(observer);
至此,subscribe之前的最后一个操作符map(在本例中是map)返回了一个最终的ObservableMap对象,它层层包裹了自下而上的每个操作符生成的Observable他们各自保存了各自上层的Observable为source成员。最后调用了.subscribe(observer)把观察者注册进来。
subscibe是Observable的方法,他调用自己的实现子类的subscribeActual做具体操作,因此当ObservableMap.subscibe时实际调用的是ObservableMap的subscribeActual,其关键实现是source.subscribe(其他的Observable.subscribe正常情况也都是调用自己的source.subscribe),既调用自己包裹的上一层操作符生成的Observable的subscribe,并生成一个Observer对象作为方法参数传入进去。
上一层调用生成的Observable调用subscribe又会调用自己包裹的自己的上层生成的Observable的subscribe,由此触发连锁反应一直拆包调用下去,最终实际效果就是层层剥开包裹,按照操作符的顺序自下而上一直调用到最顶端生成的Observer的subscribe,并且每层调用subscribe都会把方法入参Observer重新包裹一层自己的Observer并继续通过subscribe传参下去。
这样,最后一个Observable也就是顶端操作符生成的Observable的subscribe方法接收到的参数就又是一个层层包裹着的Observer(本例中传到最顶端ObservableOnSubscribe最外一层是一个发射器CreateEmitter)。调用到顶层subscribe的实现,可以使用CreateEmitter发射器做相关操作(如onNext、onComplite等),实际就是一层层拆包Observer并调用他们的对应的操作(如onNext、onComplite等)的过程。最终传递到最底层的实际的Observer订阅者。
图:
本例中的两个问题
1 subscribeOn和observeOn是怎么实现分别给上层下层调用分配不同的线程的。
subscribeOn是给上层分配线程调度器,subscribe调用是自下而上拆包执行的,当执行到subscribeOn的包时,他就分配新线程调用剩下的subscribe了,因此自subscribeOn起再往上就变成了切换后的线程。
而observeOn在自下而上调用subscribe时并没有把线程启动,而是把调度任务封装到了Observer参数中传递上去了,因此上方的调用不会受他的影响而切换线程;而当subscribe传递到最上层发送任务时,开始自上而下层层拆包Observer执行任务时,调度器就触发了,因此observeOn向下的操作就被observeOn指定的调度器改变了线程了。
2 map的Function是合适被触发的。
map操作有一个Function apply回调,他是由ObservableMap的onNext调用的,因此,他也是顶层发送onNext消息自上而下拆包到map时触发的。
三、总结:
1 每个节点的调用都是创建一个自己对应的Observable,把上一个节点的Observable保存为自己的source成员变量,在调用subscribe之前一直层层包裹。
2 最后一个节点调用subscribe就是拆开包裹调用自己的source.subscribe,并把自己的observer传参进去,由此触发一层层拆包调用自己的source.subscribe,每次拆包都创建一个自己的observer把入参的observer包进去,再传参下去,最终拆到最顶层,并传递给了顶层一个层层包裹的observer发射器。
3 顶层subscribe是提供给发布者(被观察者)发布消息的,在这里通过observer包裹调用onNext onComplete等,实际是拆包observer执行他们各自的onNext onComplete等,实际达到各自的目的,最终调用到包裹最里层就是注册的observer。
总而言之:rxjava的链式调用是一个从上而下打包Observable的过程,最末端调用subscribe注册observer是一个把Observable拆包并层层打包Observer向上传递的过程,拆包到最顶层,顶层Observable发布数据就又是一个通过拆包Observer传递数据给最底层注册的observer使用最终数据的过程。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcccia
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01