RxJava一创建操作符
1.create()
-
Observable.create(new ObservableOnSubscribe<String>() {
-
@Override
-
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
-
emitter.onNext("Hello RxJava!");
-
emitter.onComplete();
-
}
-
}).subscribe(new Observer<String>() {
-
@Override
-
public void onSubscribe(@NonNull Disposable d) {
-
-
}
-
-
@Override
-
public void onNext(@NonNull String str) {
-
Log.e("rxjava", "onNext " str);
-
}
-
-
@Override
-
public void onError(@NonNull Throwable e) {
-
e.printStackTrace();
-
}
-
-
@Override
-
public void onComplete() {
-
Log.e("rxjava", "onComplete");
-
}
-
});
-
执行结果:
-
rxjava: onNext Hello RxJava!
-
rxjava: onComplete
执行原理分析:
调用 emitter.onNext("Hello RxJava!")后,就会调用Observer的onNext(String str)。
是如何做到的呢?拆解一下下面的代码。
Observable.create()接收一个ObservableOnSubscribe,这是个接口,得实现subscribe()方法,
这个方法有一个参数ObservableEmitter<String> emitter
Observable.create() 的返回值是ObservableCreate类型,subscribe(new Observer<String>())就是设置给它的。
Observer 是一个接口,有四个需要实现的方法。
那么是如何实现,调用emitter.onNext("Hello RxJava!") 就会回调Observer的onNext(String str)方法的呢?
在创建new ObservableCreate<T>(source)时,把接收一个ObservableOnSubscribe作为参数传递进来了,而在subscribe时,又把observer传递进来了。
看看ObservableCreate内部,是如何把这两者关联起来的。
在调用subscribe时,会调用ObservableCreate的subscribeActual方法。
-
-
protected void subscribeActual(Observer<? super T> observer) {
-
//创建了一个发射器,并将observer作为参数传递进去了。这样两者就产生了关联
-
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
-
observer.onSubscribe(parent);
-
try {
-
//ObservableOnSubscribe<T> source,这个source就是Observable.create()的参数
-
//给ObservableOnSubscribe设置了一个发射器。
-
source.subscribe(parent);
-
} catch (Throwable ex) {
-
Exceptions.throwIfFatal(ex);
-
parent.onError(ex);
-
}
-
}
再看CreateEmitter内部实现。在CreateEmitter的onNext方法调用的observer的onNext方法。
这样就实现了外面看到现象。
-
public void onNext(T t) {
-
if (!isDisposed()) {
-
observer.onNext(t);
-
}
-
}
2.just()
-
Observable.just("Hello","RxJava").subscribe(new Observer<String>() {
-
@Override
-
public void onSubscribe(@NonNull Disposable d) {
-
-
}
-
-
@Override
-
public void onNext(@NonNull String s) {
-
Log.e("rxjava","onNext " s);
-
}
-
-
@Override
-
public void onError(@NonNull Throwable e) {
-
-
}
-
-
@Override
-
public void onComplete() {
-
Log.e("rxjava","onComplete ");
-
}
-
});
-
-
执行结果:
-
rxjava: onNext Hello
-
rxjava: onNext RxJava
-
rxjava: onComplete
just可以接收1-10个参数,有几个参数,就会调用机会onNext。执行完毕后,默认调用onComplete
会把这些个参数封装成一个可变参数的数组items,传递给ObservableFromArray,
Observable.just返回的是 ObservableFromArray<T>(items)。
在ObservableFromArray#subscribeActual方法中,会创建FromArrayDisposable<T>(observer, array)
在FromArrayDisposable内部会遍历这个数组,依次调用observer的onNext方法。
3.fromIterable()
-
List list = new ArrayList();
-
list.add("Hello");
-
list.add("RxJava");
-
Observable.fromIterable(list).subscribe(new Observer<String>() {
-
-
public void onSubscribe() { Disposable d
-
-
}
-
-
-
public void onNext(String o) {
-
Log.e("rxjava",o);
-
}
-
-
-
public void onError() { Throwable e
-
-
}
-
-
-
public void onComplete() {
-
-
}
-
});
-
执行结果:
-
rxjava: Hello
-
rxjava: RxJava
-
rxjava: onComplete
Observable.fromIterable(list)和just类似,fromIterable接收一个集合。
fromIterable 返回的是ObservableFromIterable<T>(source)。后面的调用逻辑和上面的类似
4.fromArray()
-
Observable.fromArray(1,2,3,4,5).subscribe(new Observer<Integer>() {
-
@Override
-
public void onSubscribe(@NonNull Disposable d) {
-
-
}
-
-
@Override
-
public void onNext(@NonNull Integer integer) {
-
Log.e("rxjava","integer " integer);
-
}
-
-
@Override
-
public void onError(@NonNull Throwable e) {
-
-
}
-
-
@Override
-
public void onComplete() {
-
Log.e("rxjava","onComplete");
-
}
-
});
-
执行结果:
-
rxjava: integer 1
-
rxjava: integer 2
-
rxjava: integer 3
-
rxjava: integer 4
-
rxjava: integer 5
-
rxjava: onComplete
Observable.fromArray接收一个泛型数组。返回的是ObservableFromArray<T>(items) 和Just一样。
5.rang 范围操作符
-
Observable.range(1,3).subscribe(new Observer<Integer>() {
-
@Override
-
public void onSubscribe(@NonNull Disposable d) {
-
-
}
-
-
@Override
-
public void onNext(@NonNull Integer integer) {
-
Log.e("rxjava","integer" integer);
-
}
-
-
@Override
-
public void onError(@NonNull Throwable e) {
-
-
}
-
-
@Override
-
public void onComplete() {
-
Log.e("rxjava","onComplete");
-
}
-
});
-
执行结果:
-
rxjava: integer1
-
rxjava: integer2
-
rxjava: integer3
-
rxjava: onComplete
进行一个for循环,来调用Observer的onNext方法,执行结束后会调用Observer的onComplete方法
6.timer:计时器
-
Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
-
Observable.timer(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
-
@Override
-
public void accept(Long aLong) throws Exception {
-
Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
-
Log.e("rxjava",aLong.toString());
-
}
-
});
-
执行结果:
-
rxjava: currentThreadName main
-
rxjava: currentThreadName RxComputationThreadPool-1
-
rxjava: 0
Observable.timer(3, TimeUnit.SECONDS),接收两个参数,时间长度和时间单位。多长时间之后,执行accept方法。
Observable.timer返回ObservableTimer。在ObservableTimer中通过调度器创建一个线程。
从执行结果看timer运行在一个在这个线程中,可以通过第三个参数指定调度器Scheduler。
比如AndroidSchedulers.mainThread()或 Schedulers.io()
-
Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
-
Observable.timer(3,TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
-
@Override
-
public void accept(Long aLong) throws Exception {
-
Log.e("rxjava","currentThreadName " Thread.currentThread().getName());
-
Log.e("rxjava",aLong.toString());
-
}
-
});
-
执行结果:
-
rxjava: currentThreadName main
-
rxjava: currentThreadName main
-
rxjava: 0
通过执行结果可以看出 AndroidSchedulers.mainThread()是指定timer运行在主线程中。
7.interval 时间间隔
-
Observable.interval(2,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
-
-
public void accept(Long aLong) throws Exception {
-
Log.e("rxjava",aLong.toString());
-
}
-
});
Observable.interval 接收俩参数,第一个时间间隔,第二个时间单位。
interval操作符,会根据设置的时间间隔,不断的执行accept(Long aLong)方法
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgccacb
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13