• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RxJavajust、from方法的基本原理

武飞扬头像
cvKnight
帮助2

前面的关于create的文章已经介绍了,Observable被订阅时的流程。

一次简单的create订阅执行流程

接下来看一看just的源码。

just有多个重载方法,内部的实现有两种方式:

  • 通过ScalarSynchronousObservable
  • 间接调用from方法

ScalarSynchronousObservable

public static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}

前面的文章介绍,最终会调用OnSubscribe的call方法,从下面的代码看到使用的是子类JustOnSubscribe,所以我们直接去看JustOnSubscribe。

public final class ScalarSynchronousObservable<T> extends Observable<T>{
	protected ScalarSynchronousObservable(final T t) {
	    super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
	    this.t = t;
	}
}

在JustOnSubscribe的call方法,调用了Subscriber的s.setProducer方法。

static final class JustOnSubscribe<T> implements OnSubscribe<T> {
    final T value;

    JustOnSubscribe(T value) {
        this.value = value;
    }

    @Override
    public void call(Subscriber<? super T> s) {
        s.setProducer(createProducer(s, value));
    }
}

public void setProducer(Producer p) {
   long toRequest;
   ...
   ...
   内部最终调用的是Producer的request方法
   producer.request(Long.MAX_VALUE);
}

static <T> Producer createProducer(Subscriber<? super T> s, T v) {
    if (STRONG_MODE) {
        return new SingleProducer<T>(s, v);
    }
    return new WeakSingleProducer<T>(s, v);
}

学新通

最后追踪到WeakSingleProducer,可见其中的request方法调用的Subscriber的onNext和onComplete方法。

static final class WeakSingleProducer<T> implements Producer {
    final Subscriber<? super T> actual;
    final T value;
    boolean once;

    public WeakSingleProducer(Subscriber<? super T> actual, T value) {
        this.actual = actual;
        this.value = value;
    }

    @Override
    public void request(long n) {
		...
        T v = value;
        Subscriber<? super T> a = actual;
        ...
        a.onNext(v);
        a.onCompleted();
    }
}
学新通

总结

回顾create方法的使用:

Observable.create(new Observable.OnSubscribe<Integer>() {
     @Override
     public void call(Subscriber<? super Integer> subscriber) {
         subscriber.onNext(1);
         subscriber.onCompleted();
     }
 });

我们在OnSubscribecall方法中直接调用了SubscriberonNextonCompleted方法,而JustOnSubscribe中并没有这么做,它把具体的执行流程交给了ProducerWeakSingleProducer)。

just的其他重载方法,间接调用的from方法,在from的源码内,也是按照上述做的。

from

public static <T> Observable<T> just(T t1, T t2) {
    return from((T[])new Object[] { t1, t2 });
}
public static <T> Observable<T> from(T[] array) {
    return unsafeCreate(new OnSubscribeFromArray<T>(array));
}
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
    final T[] array;
    public OnSubscribeFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }
}
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {

    final Subscriber<? super T> child;
    final T[] array;

    int index;

    public FromArrayProducer(Subscriber<? super T> child, T[] array) {
        this.child = child;
        this.array = array;
    }

    @Override
    public void request(long n) {

        if (n == Long.MAX_VALUE) {
            if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                fastPath();
            }
        } else
        if (n != 0) {
            if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                slowPath(n);
            }
        }
    }

    void fastPath() {
		省略部分代码
	          child.onNext(t);
		省略部分代码
	          child.onCompleted();
    }

    void slowPath(long r) {
    	省略部分代码
	          child.onNext(t);
		省略部分代码
	          child.onCompleted();
    }
}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbkaak
系列文章
更多 icon
同类精品
更多 icon
继续加载