• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

6. 线程调度 和 重试机制

武飞扬头像
NorthStar131
帮助1

学新通

一. 线程调度

1.subscribeOn()

指定Observable自身在哪个调度器上执行,它指示Observable在一个指定的调度器上给观察者发通知决定上游线程

学新通

2.observeOn()

指定Observable在一个特定的调度器上发送通知给观察者决定下游线程 (调用观察者的onNext,onCompleted,onError方法)。

学新通

  • subscribeOn 给上游代码分配线程,上游切换到哪个线程,下游要是不改的话,rxJava就在这个线程一直运行,整个rxjava中严格说来真正只有一个上游,就是产生数据的位置,如just/creat其他任何变换 和 操作符, 注册都是下游。所以subscribeOn只有第一次切换有效作用范围也是最小的,就just/creat。

  • observeOn 给下游代码分配线程,基本上操作符都会生成一个新的observable处理,和之前的observable关联(其实也就是注册到之前的observable),所以在一个操作范围来看,前一个observable发送数据给我,算是上游,我这个操作符消费数据,产生新的observable算是下游,所以observeOn可以多次切换它之后的操作符的线程。

  1.  
    Observable.create(new ObservableOnSubscribe<String>() {
  2.  
    @Override
  3.  
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
  4.  
    Log.d(TAG, "数据源" Thread.currentThread().getName());//computation
  5.  
    emitter.onNext("测试线程调度流程");
  6.  
    }
  7.  
    }).subscribeOn(Schedulers.computation()).map(new Function<String, String>() {
  8.  
    @Override
  9.  
    public String apply(@NonNull String s) throws Exception {
  10.  
    Log.d(TAG, "第1次变化" Thread.currentThread().getName()); //computation
  11.  
    return s;
  12.  
    }
  13.  
    }).subscribeOn(AndroidSchedulers.mainThread()).map(new Function<String, String>() {
  14.  
    @Override
  15.  
    public String apply(@NonNull String s) throws Exception {
  16.  
    Log.d(TAG, "第2次变化" Thread.currentThread().getName());//main
  17.  
    return s;
  18.  
    }
  19.  
    }).observeOn(Schedulers.io()).map(new Function<String, String>() {
  20.  
    @Override
  21.  
    public String apply(@NonNull String s) throws Exception {
  22.  
    Log.d(TAG, "第3次变化" Thread.currentThread().getName());//computation
  23.  
    return s;
  24.  
    }
  25.  
    }).observeOn(AndroidSchedulers.mainThread()).map(new Function<String, String>() {
  26.  
    @Override
  27.  
    public String apply(@NonNull String s) throws Exception {
  28.  
    Log.d(TAG, "第4次变化" Thread.currentThread().getName()); //main
  29.  
    return s;
  30.  
    }
  31.  
    }).subscribe(new Consumer<String>() {
  32.  
    @Override
  33.  
    public void accept(String s) throws Exception {
  34.  
    Log.d(TAG, "监听者" Thread.currentThread().getName()); //main
  35.  
    }
  36.  
    });
  37.  
    //执行结果
  38.  
    2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 数据源RxComputationThreadPool-1
  39.  
    2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第1次变化RxComputationThreadPool-1
  40.  
    2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第2次变化RxComputationThreadPool-1
  41.  
    2022-02-06 12:25:17.903 9891-10021/? D/KtActivity: 第3次变化RxCachedThreadScheduler-1
  42.  
    2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 第4次变化main
  43.  
    2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 监听者main
学新通

学新通

学新通

二. 错误重试

retrywhen()

观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。

学新通

  1.  
    var count = 0
  2.  
    @SuppressLint("CheckResult")
  3.  
    private fun testRetryWhen() {
  4.  
    //模拟网络错误 错误重试: 3
  5.  
    Observable.create<Int> { e ->
  6.  
    count
  7.  
    Log.d(TAG, "onCreate: 数据源 count = $count")
  8.  
    e.onNext(count)
  9.  
    }.flatMap {
  10.  
    if (it < 4) { //模拟网络错误
  11.  
    Observable.error(Exception())
  12.  
    } else {
  13.  
    Observable.just(it)
  14.  
    }
  15.  
    }.retryWhen(object : Function<Observable<Throwable>, ObservableSource<*>?> {
  16.  
    private var mRetryCount = 0
  17.  
    override fun apply(throwableObservable: Observable<Throwable>): ObservableSource<*>? {
  18.  
    return throwableObservable.flatMap { throwable: Throwable ->
  19.  
    mRetryCount
  20.  
    if (mRetryCount <= 3) {
  21.  
    Log.d(TAG, "获取数据失败重试第" mRetryCount "次错误-> $throwable")
  22.  
    }
  23.  
    if (throwable is Exception && mRetryCount <= 3) {
  24.  
    return@flatMap Observable.timer(500, TimeUnit.MILLISECONDS)
  25.  
    } else {
  26.  
    return@flatMap Observable.error<Int>(throwable)
  27.  
    }
  28.  
    }
  29.  
    }
  30.  
    }).subscribe(
  31.  
    { s -> Log.d(TAG, "onNext: 获取到的数据 $s") },
  32.  
    { throwable -> Log.d(TAG, "onError: ${throwable.message}") })
  33.  
    { Log.d(TAG, "onComplete") }
  34.  
    }
  35.  
    //执行结果
  36.  
    2022-02-06 14:20:16.680 20551-20551/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 1
  37.  
    2022-02-06 14:20:16.681 20551-20551/com.xzh.cdemo D/KtActivity: 获取数据失败重试第1次错误-> java.lang.Exception
  38.  
    2022-02-06 14:20:17.191 20551-20727/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 2
  39.  
    2022-02-06 14:20:17.192 20551-20727/com.xzh.cdemo D/KtActivity: 获取数据失败重试第2次错误-> java.lang.Exception
  40.  
    2022-02-06 14:20:17.693 20551-20728/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 3
  41.  
    2022-02-06 14:20:17.694 20551-20728/com.xzh.cdemo D/KtActivity: 获取数据失败重试第3次错误
  42.  
    2022-02-06 14:20:18.197 20551-20729/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 4
  43.  
    2022-02-06 14:20:18.199 20551-20729/com.xzh.cdemo D/KtActivity: onNext: 获取到的数据 4
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcaafj
系列文章
更多 icon
同类精品
更多 icon
继续加载