6. 线程调度 和 重试机制
一. 线程调度
1.subscribeOn()
指定Observable自身在哪个调度器上执行,它指示Observable在一个指定的调度器上给观察者发通知决定上游线程。
2.observeOn()
指定Observable在一个特定的调度器上发送通知给观察者决定下游线程 (调用观察者的onNext,onCompleted,onError方法)。
-
subscribeOn 给上游代码分配线程,上游切换到哪个线程,下游要是不改的话,rxJava就在这个线程一直运行,整个rxjava中严格说来真正只有一个上游,就是产生数据的位置,如just/creat其他任何变换 和 操作符, 注册都是下游。所以subscribeOn只有第一次切换有效作用范围也是最小的,就just/creat。
-
observeOn 给下游代码分配线程,基本上操作符都会生成一个新的observable处理,和之前的observable关联(其实也就是注册到之前的observable),所以在一个操作范围来看,前一个observable发送数据给我,算是上游,我这个操作符消费数据,产生新的observable算是下游,所以observeOn可以多次切换它之后的操作符的线程。
-
Observable.create(new ObservableOnSubscribe<String>() {
-
@Override
-
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
-
Log.d(TAG, "数据源" Thread.currentThread().getName());//computation
-
emitter.onNext("测试线程调度流程");
-
}
-
}).subscribeOn(Schedulers.computation()).map(new Function<String, String>() {
-
@Override
-
public String apply(@NonNull String s) throws Exception {
-
Log.d(TAG, "第1次变化" Thread.currentThread().getName()); //computation
-
return s;
-
}
-
}).subscribeOn(AndroidSchedulers.mainThread()).map(new Function<String, String>() {
-
@Override
-
public String apply(@NonNull String s) throws Exception {
-
Log.d(TAG, "第2次变化" Thread.currentThread().getName());//main
-
return s;
-
}
-
}).observeOn(Schedulers.io()).map(new Function<String, String>() {
-
@Override
-
public String apply(@NonNull String s) throws Exception {
-
Log.d(TAG, "第3次变化" Thread.currentThread().getName());//computation
-
return s;
-
}
-
}).observeOn(AndroidSchedulers.mainThread()).map(new Function<String, String>() {
-
@Override
-
public String apply(@NonNull String s) throws Exception {
-
Log.d(TAG, "第4次变化" Thread.currentThread().getName()); //main
-
return s;
-
}
-
}).subscribe(new Consumer<String>() {
-
@Override
-
public void accept(String s) throws Exception {
-
Log.d(TAG, "监听者" Thread.currentThread().getName()); //main
-
}
-
});
-
//执行结果
-
2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 数据源RxComputationThreadPool-1
-
2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第1次变化RxComputationThreadPool-1
-
2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第2次变化RxComputationThreadPool-1
-
2022-02-06 12:25:17.903 9891-10021/? D/KtActivity: 第3次变化RxCachedThreadScheduler-1
-
2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 第4次变化main
-
2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 监听者main
二. 错误重试
retrywhen()
观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
-
var count = 0
-
@SuppressLint("CheckResult")
-
private fun testRetryWhen() {
-
//模拟网络错误 错误重试: 3次
-
Observable.create<Int> { e ->
-
count
-
Log.d(TAG, "onCreate: 数据源 count = $count")
-
e.onNext(count)
-
}.flatMap {
-
if (it < 4) { //模拟网络错误
-
Observable.error(Exception())
-
} else {
-
Observable.just(it)
-
}
-
}.retryWhen(object : Function<Observable<Throwable>, ObservableSource<*>?> {
-
private var mRetryCount = 0
-
override fun apply(throwableObservable: Observable<Throwable>): ObservableSource<*>? {
-
return throwableObservable.flatMap { throwable: Throwable ->
-
mRetryCount
-
if (mRetryCount <= 3) {
-
Log.d(TAG, "获取数据失败重试第" mRetryCount "次错误-> $throwable")
-
}
-
if (throwable is Exception && mRetryCount <= 3) {
-
return@flatMap Observable.timer(500, TimeUnit.MILLISECONDS)
-
} else {
-
return@flatMap Observable.error<Int>(throwable)
-
}
-
}
-
}
-
}).subscribe(
-
{ s -> Log.d(TAG, "onNext: 获取到的数据 $s") },
-
{ throwable -> Log.d(TAG, "onError: ${throwable.message}") })
-
{ Log.d(TAG, "onComplete") }
-
}
-
//执行结果
-
2022-02-06 14:20:16.680 20551-20551/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 1
-
2022-02-06 14:20:16.681 20551-20551/com.xzh.cdemo D/KtActivity: 获取数据失败重试第1次错误-> java.lang.Exception
-
2022-02-06 14:20:17.191 20551-20727/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 2
-
2022-02-06 14:20:17.192 20551-20727/com.xzh.cdemo D/KtActivity: 获取数据失败重试第2次错误-> java.lang.Exception
-
2022-02-06 14:20:17.693 20551-20728/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 3
-
2022-02-06 14:20:17.694 20551-20728/com.xzh.cdemo D/KtActivity: 获取数据失败重试第3次错误
-
2022-02-06 14:20:18.197 20551-20729/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 4
-
2022-02-06 14:20:18.199 20551-20729/com.xzh.cdemo D/KtActivity: onNext: 获取到的数据 4
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcaafj
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13