RxJava从入门到精通RxJava源码初步
概念共识
1:Observable是被观察者,Observer是观察者,subsrcibe是让被观察者和观察者之间建立订阅关系。
2:事件的发射自 上游到下游,事件的消费订阅:自下游到上游。
3: 创建一个 ObservableCreate对象继承 Observable, 后面的发射事件,消费事件都是在这个对象中展开
1: Observable.create(new ObservableOnSubscrible())
这也是Rxjava事件中的 :第一条线
1. 仅仅是创建了 一个 Observerable 对象,这是一个抽象类并且实现 ObservableSource接口
public abstract class Observable<@NonNull T> implements ObservableSource<T>
2. Observerable定义了生成各种操作符函数,比如 Map操作符函数
-
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
-
Objects.requireNonNull(mapper, "mapper is null");
-
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
-
}
3:定义了关键函数 subscribe()来让 Observable和Observer产生订阅关系
4:定义 抽象函数 subscribeActual(Observer observer) ,订阅函数subscribe()最终会调用 实现类的 subscribeActual(Observer observer)函数
protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
2: 通过subscribe 来产生订阅关系。
这也是Rxjava事件中的 :第二条线
1: Observable 对象通过 subscribe 与 Observer 产生订阅关系;
2:subscribe 订阅关系是一条很重要的流程线,通过创建各种操作符下的 Observable对象,可以实现从:上游到下游的事件发射。此时创建订阅关系是由最后一个 Observable对象开始的,最后要给Observable来观察事件,所以事件订阅的流程就是:从下游到上游。
3:事件发生器(Emitter)
这也是Rxjava事件中的 :第三条线
1: Observer、Observable 分别和事件发生器(Emitter) 产生关联,并且通过回调来到事件发射现场Observable。
2: 根据产生的订阅链自上游到下游发布事件。
事件流可以自上而下进行下去,原因是 Observable 操作符 得到的还是 Observable,通过通过 Observable.subsribe 方法实现订阅关系。
4:基本使用
5: 代码示例
-
Observable.create(new ObservableOnSubscribe<Object>() {
-
@Override
-
public void subscribe(@NonNull ObservableEmitter<Object> observer) throws Throwable {
-
Log.d(TAG, "步骤二:发射事件");
-
observer.onNext("步骤二发射事件");
-
}
-
}).subscribe(new Observer<Object>() {
-
@Override
-
public void onSubscribe(@NonNull Disposable d) {
-
Log.d(TAG, "步骤一:不改变Disposable布尔值让其消费事件");
-
}
-
-
@Override
-
public void onNext(@NonNull Object o) {
-
Log.d(TAG, "步骤三消费:" o.toString());
-
}
-
-
@Override
-
public void onError(@NonNull Throwable e) {
-
-
}
-
-
@Override
-
public void onComplete() {
-
-
}
-
});
-
-
-
打印结果:
-
步骤一:不改变Disposable布尔值让其消费事件
-
步骤二:发射事件
-
步骤三消费:步骤二发射事件
6:源码分析:
1 :Observable.create(new ObservableOnSubscribe<Object> ())
实际上是:构造了 ObservableCreate对象,这个就是最初的发射事件对象
-
Observable.java
-
-
-
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
-
Objects.requireNonNull(source, "source is null");
-
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
-
}
2: Observable.subscribe(new Observer())
利用其多态的性质,此时Observable的实现类是 :步骤一的 ObservableCreate对象。随意subscribe(new Observer()) 最终会调用 ObservableCreate.subscribeAcutal() 函数
-
Observable.java
-
-
public final void subscribe(super T> observer) { Observer<?
-
Objects.requireNonNull(observer, "observer is null");
-
try {
-
-
observer = RxJavaPlugins.onSubscribe(this, observer);
-
-
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
-
-
// 利用多态的性质,调用 ObservableCreate的 subscribeAcutal(observer)函数
-
subscribeActual(observer);
-
} catch (NullPointerException e) { // NOPMD
-
throw e;
-
} catch (Throwable e) {
-
Exceptions.throwIfFatal(e);
-
// can't call onError because no way to know if a Disposable has been set or not
-
// can't call onSubscribe because the call might have set a Subscription already
-
RxJavaPlugins.onError(e);
-
-
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
-
npe.initCause(e);
-
throw npe;
-
}
-
}
3: ObservableCreate 会先调用 observer.onScbscribe(createEmitter) 函数,然后回调
source.subscribe(createEmitter) 函数,( 这里的source就是 ObservableOnSubscribe)
通过createEmitter 来发射事件
-
ObservableCreate.java
-
protected void subscribeActual(Observer<? super T> observer) {
-
CreateEmitter<T> parent = new CreateEmitter<>(observer);
-
-
// Observable发射事件后,先调用 Observer的 onSubscribe()函数,来决定后面是否消费事件
-
observer.onSubscribe(parent);
-
-
try {
-
-
// 通过source来发射事件
-
source.subscribe(parent);
-
} catch (Throwable ex) {
-
Exceptions.throwIfFatal(ex);
-
parent.onError(ex);
-
}
-
}
-
-
-
Observable.create(new ObservableOnSubscribe<Object>() {
-
@Override
-
public void subscribe(@NonNull ObservableEmitter<Object> observer) throws Throwable {
-
Log.d(TAG, "步骤二:发射事件");
-
-
// 通过source来发射事件
-
observer.onNext("步骤二发射事件");
-
}
-
})
-
-
-
-
-
ObservableCreate.java
-
-
static final class CreateEmitter<T>
-
extends AtomicReference<Disposable>
-
implements ObservableEmitter<T>, Disposable {
-
-
private static final long serialVersionUID = -3434801548987643227L;
-
-
final Observer<? super T> observer;
-
-
CreateEmitter(Observer<? super T> observer) {
-
this.observer = observer;
-
}
-
-
// 呼应 上面: 通过source来发射事件 流程
-
@Override
-
public void onNext(T t) {
-
if (t == null) {
-
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
-
return;
-
}
-
-
// 呼应上面 : 先调用 Observer的 onSubscribe()函数,来决定后面是否消费事件
-
if (!isDisposed()) {
-
observer.onNext(t);
-
}
-
}
-
-
-
-
4 :如果在 Observer的 onSubscribe(Disposable) 中没有解除订阅流程,那么就可以让 消费者Observer来消费事件了
7:Schedulers: 指定线程
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgfkegf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13