• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RxJava使用和原理

武飞扬头像
菜籽同学
帮助1

一、事件分发流程:

常规创建Observable观察者:

  1.  
    Observable.create(new ObservableOnSubscribe<String>() {
  2.  
    @Override
  3.  
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
  4.  
    emitter.onNext("");
  5.  
    emitter.onComplete();
  6.  
    }
  7.  
    }).flatMap(new Function<String, ObservableSource<String>>() {
  8.  
    @Override
  9.  
    public ObservableSource<String> apply(@NonNull String s) throws Exception {
  10.  
    System.out.println("我们这里需要重写的方法");
  11.  
    return Observable.just("");
  12.  
    }
  13.  
    }).observeOn(Schedulers.io())
  14.  
    .subscribeOn(Schedulers.newThread())
  15.  
    .subscribe(new Observer<String>() {
  16.  
    @Override
  17.  
    public void onSubscribe(@NonNull Disposable d) {
  18.  
    System.out.println("方法一:onSubscribe");
  19.  
    }
  20.  
     
  21.  
    @Override
  22.  
    public void onNext(@NonNull String s) {
  23.  
    System.out.println("方法二:onNext");
  24.  
    }
  25.  
     
  26.  
    @Override
  27.  
    public void onError(@NonNull Throwable e) {
  28.  
    System.out.println("方法三:onError");
  29.  
    }
  30.  
     
  31.  
    @Override
  32.  
    public void onComplete() {
  33.  
    System.out.println("方法四:onComplete");
  34.  
    }
  35.  
    });
  36.  
    }
学新通

从Observable.create()点进去,通过一个静态方法,追踪到

  1.  
    public final class ObservableCreate<T> extends Observable<T> {
  2.  
    final ObservableOnSubscribe<T> source;
  3.  
     
  4.  
    public ObservableCreate(ObservableOnSubscribe<T> source) {
  5.  
    this.source = source;
  6.  
    }
  7.  
     
  8.  
    }

这里构造方法中保存的source就是外部传进来的ObservableOnSubscribe,存储下来后,当外部调用subscribe时,会执行到Observable的subscribeActual方法,而ObservableCreate实现了该方法,

  1.  
    @Override
  2.  
    protected void subscribeActual(Observer<? super T> observer) {
  3.  
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
  4.  
    observer.onSubscribe(parent);
  5.  
     
  6.  
    try {
  7.  
    source.subscribe(parent);
  8.  
    } catch (Throwable ex) {
  9.  
    Exceptions.throwIfFatal(ex);
  10.  
    parent.onError(ex);
  11.  
    }
  12.  
    }

这里做了三件事情:

1、创建分发器CreateEmitter对象,

2、回调生命周期给观察者,

3、执行subscribe方法,并传入CreateEmitter对象,

解释一:创建CreateEmitter用来把对象句柄传递给外部使用,同时把观察者和被观察者关联起来,这里的CreateEmitter就是被观察者,而Observer对象则属于观察者

解释二:这里的observer.onSubscribe(parent),则是执行了 “System.out.println("方法一:onSubscribe")”,在整个处理流开始执行之前,把生命周期回调给了订阅者

解释三:通过source.subscribe(parent),把分发器传递给被观察着用来发送事件,这里的subscribe,就是调用了外部传入的ObservableOnSubscribe对象,通知被观察者开始发送事件,这里会执行到”System.out.println("开始发送事件")“,我们可以通过subscribe中传递过来的emitter对象进行事件的分发处理

当我们调用emitter.onNext("")时,可以理解为被观察者发送了一个事件出来,因为CreateEmitter实现了ObservableEmitter,因此这里会回调到CreateEmitter的onNext方法,代码如下:

  1.  
    static final class CreateEmitter<T>
  2.  
    extends AtomicReference<Disposable>
  3.  
    implements ObservableEmitter<T>, Disposable {
  4.  
     
  5.  
    private static final long serialVersionUID = -3434801548987643227L;
  6.  
     
  7.  
    final Observer<? super T> observer;
  8.  
     
  9.  
    CreateEmitter(Observer<? super T> observer) {
  10.  
    this.observer = observer;
  11.  
    }
  12.  
     
  13.  
     
  14.  
    // 会回调到这里
  15.  
    @Override
  16.  
    public void onNext(T t) {
  17.  
    if (t == null) {
  18.  
    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
  19.  
    return;
  20.  
    }
  21.  
    if (!isDisposed()) {
  22.  
    observer.onNext(t);
  23.  
    }
  24.  
    }
学新通

在onNext内部,CreateEmitter会调用观察者对象的onNext方法,把事件(也就是数据)传递给观察者,也就是会执行到前面demo中的System.out.println("方法二:onNext");方法,同时我们也可以从代码中看到,当判空时,会执行到onError()方法,这里同样会调用观察者的onError()方法,把事件传递出去

二、订阅与取消

在CreateEmitter的每次回调中,我们都可以看到isDisposed()的判断,这个判断是用来检测观察者是否已取消订阅。如果观察者取消订阅的话,那么就不会把执行结果通知到观察者,这里的典型使用场景:在activity中请求网络数据,当数据请求回来后刷新界面,但是如果数据请求回来前用户手动关闭了activity的话。当数据请求回来后,因为activity内部的view已经被销毁,会出现控件报空指针的问题,在这里我们可以通过dispose()方法取消事件的回调传递,这里涉及到一个类:DisposableHelper

  1.  
    public enum DisposableHelper implements Disposable {
  2.  
    /**
  3.  
    * The singleton instance representing a terminal, disposed state, don't leak it.
  4.  
    */
  5.  
    DISPOSED
  6.  
    ;
  7.  
     
  8.  
    /**
  9.  
    * Checks if the given Disposable is the common {@link #DISPOSED} enum value.
  10.  
    * @param d the disposable to check
  11.  
    * @return true if d is {@link #DISPOSED}
  12.  
    */
  13.  
    public static boolean isDisposed(Disposable d) {
  14.  
    return d == DISPOSED;
  15.  
    }
  16.  
    }
学新通

DisposableHelper是一个枚举类,有一个DISPOSED枚举类型,用来记录观察者当前的状态,当执订阅者执行,代码如下:

  1.  
    @Override
  2.  
    public void dispose() {
  3.  
    DisposableHelper.dispose(this);
  4.  
    }

其内部实现为:

  1.  
    public static boolean dispose(AtomicReference<Disposable> field) {
  2.  
    Disposable current = field.get();
  3.  
    Disposable d = DISPOSED;
  4.  
    if (current != d) {
  5.  
    current = field.getAndSet(d);
  6.  
    if (current != d) {
  7.  
    if (current != null) {
  8.  
    current.dispose();
  9.  
    }
  10.  
    return true;
  11.  
    }
  12.  
    }
  13.  
    return false;
  14.  
    }

通过这里把观察者的状态设置为DISPOSED,用来标志观察者已取消订阅,在被观察者每次执行生命周期时,通过对观察者的状态判断,用以确定是否需要把数据回调给被观察者

三、线程调度:

这里通过observeOn举例说明:

  1.  
    @CheckReturnValue
  2.  
    @SchedulerSupport(SchedulerSupport.CUSTOM)
  3.  
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
  4.  
    // 省略非关键代码
  5.  
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
  6.  
    }

ObservableObserveOn实现了AbstractObservableWithUpstream,进而实现了Observable

这里可以追踪到ObservableObserveOn的源码中,可以看到:

  1.  
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
  2.  
    super(source);
  3.  
    // 省略非关键代码
  4.  
    }

它的构造器调用了super(),而super内部则保存了前一个观察者对象:

  1.  
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
  2.  
    AbstractObservableWithUpstream(ObservableSource<T> source) {
  3.  
    this.source = source;
  4.  
    }
  5.  
    }

当我们调用subcribe()时,内部会执行到ObservableObserveOn如下方法:

  1.  
    @Override
  2.  
    protected void subscribeActual(Observer<? super T> observer) {
  3.  
    // 这里的if用来判断是否调度为当前线程,如果是,则不需要调度器来切换新线程
  4.  
    if (scheduler instanceof TrampolineScheduler) {
  5.  
    source.subscribe(observer);
  6.  
    } else {
  7.  
    Scheduler.Worker w = scheduler.createWorker();
  8.  
    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  9.  
    }
  10.  
    }

通过 scheduler.createWorker()来切换到指定的线程,此时外部调用onNext()时,会执行到这里,在指定线程进行处理:

  1.  
    @Override
  2.  
    public void onNext(T t) {
  3.  
    if (done) {
  4.  
    return;
  5.  
    }
  6.  
     
  7.  
    if (sourceMode != QueueDisposable.ASYNC) {
  8.  
    queue.offer(t);
  9.  
    }
  10.  
    schedule();
  11.  
    }
  12.  
     
  13.  
    // 省略非关键代码
  14.  
     
  15.  
    void schedule() {
  16.  
    if (getAndIncrement() == 0) {
  17.  
    worker.schedule(this);
  18.  
    }
  19.  
    }
学新通

schedule是一个抽象方法,我们选择一个具体实现NewThreadWorker,这个NewThreadWorker是用来切换新线程的,我们定位到它的具体实现,看下内部是怎么处理的:

  1.  
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
  2.  
    // 省略非关键代码
  3.  
    try {
  4.  
    if (delayTime <= 0) {
  5.  
    f = executor.submit((Callable<Object>)sr);
  6.  
    } else {
  7.  
    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
  8.  
    }
  9.  
    sr.setFuture(f);
  10.  
    } catch (RejectedExecutionException ex) {
  11.  
    // 省略非关键代码
  12.  
    }
  13.  
    return sr;
  14.  
    }

ObserveOnObserver继承了Runnable,这个executor.submit((Callable<Object>)sr);就是线程池中用来提交任务的方法,线程池内部会调用runnable的run方法,从而调用到:

  1.  
    @Override
  2.  
    public void run() {
  3.  
    if (outputFused) {
  4.  
    drainFused();
  5.  
    } else {
  6.  
    drainNormal();
  7.  
    }
  8.  
    }

接着:

  1.  
    void drainNormal() {
  2.  
    // 省略非关键代码
  3.  
    for (;;) {
  4.  
    if (checkTerminated(done, q.isEmpty(), a)) {
  5.  
    return;
  6.  
    }
  7.  
     
  8.  
    // 省略非关键代码
  9.  
    a.onNext(v);
  10.  
     
  11.  
    missed = addAndGet(-missed);
  12.  
    if (missed == 0) {
  13.  
    break;
  14.  
    }
  15.  
    }
  16.  
    }
学新通

实现生命周期的回调,类似的,其它生命周期回调也是同样的流程

补充一下线程调度的各参数说明:

参数类型 解释 使用场景
Schedulers.immediate() 当前线程 = 不指定线程 默认
AndroidSchedulers.mainThread() Android主线程 操作UI
Schedulers.newThread() 常规新线程 耗时等操作
Schedulers.io() io操作线程 网络请求、读写文件等io密集型操作
Schedulers.computation() CPU计算操作线程 大量计算操作

四、流式转换:

当我们调用flatMap()时,经过重重方法重载,最终会走到这里

  1.  
    @CheckReturnValue
  2.  
    @SchedulerSupport(SchedulerSupport.NONE)
  3.  
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
  4.  
    // 省略非关键代码
  5.  
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
  6.  
    }

我们点击去ObservableFlatMap,可以看到,它最终也是实现了Observable类,那么前面分析过,当外部调用subscribe()时,会到这里:

  1.  
    @Override
  2.  
    public void subscribeActual(Observer<? super U> t) {
  3.  
    // 省略非关键代码
  4.  
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
  5.  
    }

我们从MergeObserver类追进去,可以看到,当外部调用onNext()时,会走到这里:

  1.  
    @Override
  2.  
    public void onNext(T t) {
  3.  
    // 省略非关键代码
  4.  
    try {
  5.  
    p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
  6.  
    } catch (Throwable e) {
  7.  
    // 省略非关键代码
  8.  
    }
  9.  
    }

重点在mapper.apply(t)这一行,点击去可以看到这是一个抽象方法,那么它的具体实现在哪里呢?一步一步的回溯,发现这个mapper就是我们外部传入的new Function<String, ObservableSource<String>>(),那么这个apply就是外部我们需要重写的方法,也就是这个打印这个log的位置:System.out.println("我们这里需要重写的方法");至此,整个回路追踪可以完整串联起来

解释:其它省略的方法内容,因为flatMap可以执行多个,省略的代码就是循环执行的分发流程,我们只追寻主要流程,这些分支流程没有详细区分

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcccbf
系列文章
更多 icon
同类精品
更多 icon
继续加载