• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RxJava一创建操作符

武飞扬头像
niuyongzhi
帮助1

1.create()

  1.  
    Observable.create(new ObservableOnSubscribe<String>() {
  2.  
    @Override
  3.  
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
  4.  
    emitter.onNext("Hello RxJava!");
  5.  
    emitter.onComplete();
  6.  
    }
  7.  
    }).subscribe(new Observer<String>() {
  8.  
    @Override
  9.  
    public void onSubscribe(@NonNull Disposable d) {
  10.  
     
  11.  
    }
  12.  
     
  13.  
    @Override
  14.  
    public void onNext(@NonNull String str) {
  15.  
    Log.e("rxjava", "onNext " str);
  16.  
    }
  17.  
     
  18.  
    @Override
  19.  
    public void onError(@NonNull Throwable e) {
  20.  
    e.printStackTrace();
  21.  
    }
  22.  
     
  23.  
    @Override
  24.  
    public void onComplete() {
  25.  
    Log.e("rxjava", "onComplete");
  26.  
    }
  27.  
    });
学新通
  1.  
    执行结果:
  2.  
    rxjava: onNext Hello RxJava!
  3.  
    rxjava: onComplete

执行原理分析:

调用 emitter.onNext("Hello RxJava!")后,就会调用Observer的onNext(String str)。
  是如何做到的呢?拆解一下下面的代码。
  Observable.create()接收一个ObservableOnSubscribe,这是个接口,得实现subscribe()方法,
                      这个方法有一个参数ObservableEmitter<String> emitter
   Observable.create() 的返回值是ObservableCreate类型,subscribe(new Observer<String>())就是设置给它的。
                       Observer 是一个接口,有四个需要实现的方法。
  那么是如何实现,调用emitter.onNext("Hello RxJava!") 就会回调Observer的onNext(String str)方法的呢?
  在创建new ObservableCreate<T>(source)时,把接收一个ObservableOnSubscribe作为参数传递进来了,而在subscribe时,又把observer传递进来了。
  看看ObservableCreate内部,是如何把这两者关联起来的。
  在调用subscribe时,会调用ObservableCreate的subscribeActual方法。
 

  1.  
     
  2.  
    protected void subscribeActual(Observer<? super T> observer) {
  3.  
    //创建了一个发射器,并将observer作为参数传递进去了。这样两者就产生了关联
  4.  
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
  5.  
    observer.onSubscribe(parent);
  6.  
    try {
  7.  
    //ObservableOnSubscribe<T> source,这个source就是Observable.create()的参数
  8.  
    //给ObservableOnSubscribe设置了一个发射器。
  9.  
    source.subscribe(parent);
  10.  
    } catch (Throwable ex) {
  11.  
    Exceptions.throwIfFatal(ex);
  12.  
    parent.onError(ex);
  13.  
    }
  14.  
    }

再看CreateEmitter内部实现。在CreateEmitter的onNext方法调用的observer的onNext方法。
这样就实现了外面看到现象。

  1.  
    public void onNext(T t) {
  2.  
    if (!isDisposed()) {
  3.  
    observer.onNext(t);
  4.  
    }
  5.  
    }

2.just()

  1.  
    Observable.just("Hello","RxJava").subscribe(new Observer<String>() {
  2.  
    @Override
  3.  
    public void onSubscribe(@NonNull Disposable d) {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    @Override
  8.  
    public void onNext(@NonNull String s) {
  9.  
    Log.e("rxjava","onNext " s);
  10.  
    }
  11.  
     
  12.  
    @Override
  13.  
    public void onError(@NonNull Throwable e) {
  14.  
     
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onComplete() {
  19.  
    Log.e("rxjava","onComplete ");
  20.  
    }
  21.  
    });
  22.  
     
  23.  
    执行结果:
  24.  
    rxjava: onNext Hello
  25.  
    rxjava: onNext RxJava
  26.  
    rxjava: onComplete
学新通

just可以接收1-10个参数,有几个参数,就会调用机会onNext。执行完毕后,默认调用onComplete
会把这些个参数封装成一个可变参数的数组items,传递给ObservableFromArray,
Observable.just返回的是 ObservableFromArray<T>(items)。
在ObservableFromArray#subscribeActual方法中,会创建FromArrayDisposable<T>(observer, array)
在FromArrayDisposable内部会遍历这个数组,依次调用observer的onNext方法。

3.fromIterable()

  1.  
    List list = new ArrayList();
  2.  
    list.add("Hello");
  3.  
    list.add("RxJava");
  4.  
    Observable.fromIterable(list).subscribe(new Observer<String>() {
  5.  
    @Override
  6.  
    public void onSubscribe(@NonNull Disposable d) {
  7.  
     
  8.  
    }
  9.  
     
  10.  
    @Override
  11.  
    public void onNext(@NonNull String o) {
  12.  
    Log.e("rxjava",o);
  13.  
    }
  14.  
     
  15.  
    @Override
  16.  
    public void onError(@NonNull Throwable e) {
  17.  
     
  18.  
    }
  19.  
     
  20.  
    @Override
  21.  
    public void onComplete() {
  22.  
     
  23.  
    }
  24.  
    });
  25.  
    执行结果:
  26.  
    rxjava: Hello
  27.  
    rxjava: RxJava
  28.  
    rxjava: onComplete
学新通

Observable.fromIterable(list)和just类似,fromIterable接收一个集合。
fromIterable 返回的是ObservableFromIterable<T>(source)。后面的调用逻辑和上面的类似

4.fromArray()

  1.  
    Observable.fromArray(1,2,3,4,5).subscribe(new Observer<Integer>() {
  2.  
    @Override
  3.  
    public void onSubscribe(@NonNull Disposable d) {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    @Override
  8.  
    public void onNext(@NonNull Integer integer) {
  9.  
    Log.e("rxjava","integer " integer);
  10.  
    }
  11.  
     
  12.  
    @Override
  13.  
    public void onError(@NonNull Throwable e) {
  14.  
     
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onComplete() {
  19.  
    Log.e("rxjava","onComplete");
  20.  
    }
  21.  
    });
  22.  
    执行结果:
  23.  
    rxjava: integer 1
  24.  
    rxjava: integer 2
  25.  
    rxjava: integer 3
  26.  
    rxjava: integer 4
  27.  
    rxjava: integer 5
  28.  
    rxjava: onComplete
学新通

Observable.fromArray接收一个泛型数组。返回的是ObservableFromArray<T>(items) 和Just一样。
5.rang 范围操作符

  1.  
    Observable.range(1,3).subscribe(new Observer<Integer>() {
  2.  
    @Override
  3.  
    public void onSubscribe(@NonNull Disposable d) {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    @Override
  8.  
    public void onNext(@NonNull Integer integer) {
  9.  
    Log.e("rxjava","integer" integer);
  10.  
    }
  11.  
     
  12.  
    @Override
  13.  
    public void onError(@NonNull Throwable e) {
  14.  
     
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onComplete() {
  19.  
    Log.e("rxjava","onComplete");
  20.  
    }
  21.  
    });
  22.  
    执行结果:
  23.  
    rxjava: integer1
  24.  
    rxjava: integer2
  25.  
    rxjava: integer3
  26.  
    rxjava: onComplete
学新通

进行一个for循环,来调用Observer的onNext方法,执行结束后会调用Observer的onComplete方法
6.timer:计时器

  1.  
    Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
  2.  
    Observable.timer(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
  3.  
    @Override
  4.  
    public void accept(Long aLong) throws Exception {
  5.  
    Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
  6.  
    Log.e("rxjava",aLong.toString());
  7.  
    }
  8.  
    });
  9.  
    执行结果:
  10.  
    rxjava: currentThreadName main
  11.  
    rxjava: currentThreadName RxComputationThreadPool-1
  12.  
    rxjava: 0

Observable.timer(3, TimeUnit.SECONDS),接收两个参数,时间长度和时间单位。多长时间之后,执行accept方法。
Observable.timer返回ObservableTimer。在ObservableTimer中通过调度器创建一个线程。
从执行结果看timer运行在一个在这个线程中,可以通过第三个参数指定调度器Scheduler。
比如AndroidSchedulers.mainThread()或 Schedulers.io()

  1.  
    Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
  2.  
    Observable.timer(3,TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
  3.  
    @Override
  4.  
    public void accept(Long aLong) throws Exception {
  5.  
    Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
  6.  
    Log.e("rxjava",aLong.toString());
  7.  
    }
  8.  
    });
  9.  
    执行结果:
  10.  
    rxjava: currentThreadName main
  11.  
    rxjava: currentThreadName main
  12.  
    rxjava: 0

通过执行结果可以看出 AndroidSchedulers.mainThread()是指定timer运行在主线程中。
7.interval 时间间隔

  1.  
    Observable.interval(2,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
  2.  
    @Override
  3.  
    public void accept(Long aLong) throws Exception {
  4.  
    Log.e("rxjava",aLong.toString());
  5.  
    }
  6.  
    });

 Observable.interval 接收俩参数,第一个时间间隔,第二个时间单位。
 interval操作符,会根据设置的时间间隔,不断的执行accept(Long aLong)方法

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgccacb
系列文章
更多 icon
同类精品
更多 icon
继续加载