• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Androidrxjava使用和源码

武飞扬头像
淼森007
帮助1

目录

1.rxjava从原理是基于一种扩展观察者模式。

2.扩展观察者模式当中有4个关键角色

3.rxjava本质原理

4.创建rxjava可以分为三个步骤

5.rxjava使用方法

6.rxjava使用总结 

7.轮询的定义

8.相比轮询,长连接的缺点

9.使用Handler实现轮询方法

10.使用rxjava实现轮询的网络请求

11.缓存策略

12.为什么删除缓存?

13.LRU核心思想

14.LruCache

15.LruCache类源码分析

16.Rxjava是如何实现缓存的


1.rxjava从原理是基于一种扩展观察者模式。

2.扩展观察者模式当中有4个关键角色

<1>观察者。观察者它是用来接收事件,并基于事件作出响应动作的一个角色。

<2>被观察者。被观察者它是用于生产事件的。它生产事件会交给观察者,观察者会根据响应作出不同的动作。

<3>订阅。这个角色是用于连接被观察者和观察者之间的。

<4>事件。被观察者和观察者之间沟通的载体。

3.rxjava本质原理

当被观察者Observable通过订阅subscribe这个方法,按顺序发送事件给观察者Observer的时候,这时观察者Observer它会根据我们接收到事件顺序,依次做出不同的响应动作。我们在这个动作当中可以做不同的业务需求。

4.创建rxjava可以分为三个步骤

<1>创建被观察者(Observable),被观察者要生产事件来交给观察者进行接收。

<2>创建观察者(Observer),观察者会根据接收到观察者发送的事件,来去定义响应事件的动作和行为。

<3>通过订阅(Subscribe)连接观察者和被观察者。

5.rxjava使用方法

Step 1

  1.  
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
  2.  
    @Override
  3.  
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
  4.  
    emitter.onNext(1);
  5.  
    emitter.onNext(2);
  6.  
    emitter.onNext(3);
  7.  
    emitter.onComplete();
  8.  
    }
  9.  
    });

首先我们要创建被观察者Observable。Observable通过create方法生产事件。而create是rxjava当中最基本的一个创建事件序列的方法。在这个方法当中,我们传入了一个OnSubscribe这个对象。也就说当我们这个被观察者被订阅的时候,它会调用onSubscribe当中的subscribe方法,来去依次地被触发。这其实就是一种观察者模式。也就是说被观察者它是被观察者在那边观察的。一旦被观察者有事件的改变,观察者就会通知其他的去做出不同的响应。所以说这是一种扩展的观察者模式。还要注意的是,我们要去复写subscribe方法。subscribe这个方法里面,我们要去定义发送事件。如何定义发送事件?它其实就是通过ObservableEmitter这个观察者的发射类来进行的。这个类其实就是事件发射器的类,在这个类当中我们可以去定义需要发送的事件。同时我们还可以向观察者发送事件。

Step 2

方法一:

  1.  
    Observer<Integer> observer = new Observer<Integer>() {
  2.  
    @Override
  3.  
    public void onSubscribe(@NonNull Disposable d) {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    @Override
  8.  
    public void onNext(@NonNull Integer integer) {
  9.  
     
  10.  
    }
  11.  
     
  12.  
    @Override
  13.  
    public void onError(@NonNull Throwable e) {
  14.  
     
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onComplete() {
  19.  
     
  20.  
    }
  21.  
    };
学新通

创建观察者有两种方式,第一种就是上述代码所展示的。在创建观察者的时候,还要去定义响应事件的行为。各回调方法说明如下:

onSubscribe:观察者接收事件前,默认最先调用复写onSubscribe()

onNext:当被观察者生产Next事件,并且观察者接收到时,会调用该复写方法进行响应。

onError:当被观察者生产Error事件,并且观察者接收到时,会调用该复写方法进行响应。 

onComplete:当被观察者生产Complete事件,并且观察者接收到时,会调用该复写方法进行响应。

方法二:

  1.  
    Subscriber<String> subscriber = new Subscriber<String>() {
  2.  
    @Override
  3.  
    public void onSubscribe(Subscription s) {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    @Override
  8.  
    public void onNext(String s) {
  9.  
     
  10.  
    }
  11.  
     
  12.  
    @Override
  13.  
    public void onError(Throwable t) {
  14.  
     
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onComplete() {
  19.  
     
  20.  
    }
  21.  
    };
学新通

第二种subscribe,它其实是rxjava内置的实现了Observer的抽象类。这个抽象类的创建过程与Observer类似。Subscribe这个抽象类是对Observer接口进行了扩展。 这个类多了两个方法。一个是onStart(),它在还未响应事件前会调用,做一些初始化的工作;还有就是unsubscribe这个方法。它用于去取消订阅。就说一旦观察者订阅了被观察者之后,它可以去取消订阅。那么这个方法调用之后,观察者就不会再接收到被观察者发送过来的响应事件了。

Step 3

observable.subscribe(observer);

第三步就是订阅,它就是调用subscribe这个方法,就完成了观察者、被观察者的调用。

6.rxjava使用总结 

<1>创建被观察者(Observable)&生产事件

<2>创建观察者(Observer)并定义响应事件的行为

<3>通过订阅(Subscribe)连接观察者和被观察者

7.轮询的定义

APP端每隔一定的时间重复请求的操作。

8.相比轮询,长连接的缺点

长连接也可以完成类似轮询的需求,但这个长连接并不是稳定可靠的。我们在轮询操作的时候一般都需要稳定的网络请求。而轮询操作相比长连接它是有生命周期的。就是说轮询是在一定的生命周期内去执行完成的。而我们的长连接它是要跨整个进程生命周期的。所以说这两者是有区别的。

9.使用Handler实现轮询方法

  1.  
    private static Handler loopRequestHandler = new Handler(){
  2.  
    @Override
  3.  
    public void handleMessage(Message msg) {
  4.  
    if (msg.what == LOOP_WHAT) {
  5.  
    dosomething();
  6.  
    loopRequestHandler.removeMessages(LOOP_WHAT);
  7.  
    System.gc();
  8.  
    loopRequestHandler.sendEmptyMessageDelayed(LOOP_WHAT, 2000);
  9.  
    }
  10.  
    }
  11.  
    };

该方法其实就是使用了延迟执行的原理,使Handler内的方法每隔两秒执行一次。其中dosomething()方法是我们自己要执行的轮询方法。

10.使用rxjava实现轮询的网络请求

<1>创建描述网络请求的接口

  1.  
    public class retrofit_interface {
  2.  
    @GET("部分URL")
  3.  
    Call<Person> getCall();
  4.  
    }

<2>发送网络请求 

  1.  
    Observable.interval(2, 1, TimeUnit.SECONDS)
  2.  
     
  3.  
    /*
  4.  
    * 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
  5.  
    * 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
  6.  
    **/
  7.  
    .doOnNext(new Consumer<Long>() {
  8.  
    @Override
  9.  
    public void accept(Long integer) throws Exception {
  10.  
     
  11.  
    /*
  12.  
    * 步骤3:通过Retrofit发送网络请求
  13.  
    **/
  14.  
    // a. 创建Retrofit对象
  15.  
    Retrofit retrofit = new Retrofit.Builder()
  16.  
    .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
  17.  
    .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
  18.  
    .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) // 支持RxJava
  19.  
    .build();
  20.  
     
  21.  
    // b. 创建 网络请求接口 的实例
  22.  
    LoopRequest_interface request = retrofit.create(LoopRequest_interface.class);
  23.  
     
  24.  
    // c. 采用Observable<...>形式 对 网络请求 进行封装
  25.  
    Observable<Person> observable = request.getCall();
  26.  
    // d. 通过线程切换发送网络请求
  27.  
    observable.subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
  28.  
    .observeOn(Schedulers.newThread()) // 切换回到主线程 处理请求结果
  29.  
    .subscribe(new Observer<Person>() {
  30.  
    @Override
  31.  
    public void onSubscribe(Disposable d) {
  32.  
    }
  33.  
     
  34.  
    @Override
  35.  
    public void onNext(Person result) {
  36.  
    // e.接收服务器返回的数据
  37.  
    }
  38.  
     
  39.  
    @Override
  40.  
    public void onError(Throwable e) {
  41.  
    }
  42.  
     
  43.  
    @Override
  44.  
    public void onComplete() {
  45.  
     
  46.  
    }
  47.  
    });
  48.  
     
  49.  
    }
  50.  
    })
  51.  
    .subscribe(new Observer<Long>() {
  52.  
    @Override
  53.  
    public void onSubscribe(Disposable d) {
  54.  
     
  55.  
    }
  56.  
     
  57.  
    @Override
  58.  
    public void onNext(Long value) {
  59.  
     
  60.  
    }
  61.  
     
  62.  
    @Override
  63.  
    public void onError(Throwable e) {
  64.  
     
  65.  
    }
  66.  
     
  67.  
    @Override
  68.  
    public void onComplete() {
  69.  
     
  70.  
    }
  71.  
    });
学新通

代码中我们调用被观察者interval方法,interval就是延迟发送的一个方法。这里展示的是无限次轮询,而不是有限次轮询。如果想使用有限次轮询,要调用intervalRange这个方法。

这里注意一下interval的三个参数。第一个参数表示第一次延迟的时间。第二个参数表示间隔时间的数字。第三个参数是时间单位。这里的参数设置就表示,我延迟2秒发送事件,同时每隔1秒产生一个数字,从0开始递增,而且我们是无限的轮询。这是第一步。

第二步,我们每次发送数字前,都要发送网络请求。这里的doOnNext回调就是在执行next事件前调用。也就说每隔1秒产生一个数字前,就发送一个网络请求。从而就实现了我们的轮询需求。

第三步就是利用Retrofit进行网络请求。首先要创建一个Retrofit对象,然后通过build构建者模式来创建Retrofit。通过baseurl来设置网络请求路径。调用ConverterFactory设置json解析器。通过适配设置rxjava可适配。这就完成了第一步。第二步我们要创建网络请求接口实例,就是通过Retrofit.create()方法。第三步我们要对我们的网络请求进行封装。第四步是不一样的地方,通过rxjava的线程切换,来进行发送网络请求的任务。首先我们要调用subscribeOn表示我们切换到子线程当中进行网络请求。然后调用observeOn表示我们切换到主线程来处理我们请求的结果。

11.缓存策略

一般来说Android的缓存策略其实主要包含缓存的添加、获取和删除这三类操作。

12.为什么删除缓存?

不论内存缓存还是硬盘缓存,它的缓存大小都是有上限的。当你的缓存存满之后,如果你想再继续添加缓存,这时候你务必要去删除一些旧的缓存,然后才能去增加新的缓存。

13.LRU核心思想

当缓存存满的时候,它会优先淘汰一部分的缓存对象,即近期最少使用的缓存对象。

14.LruCache

Android3.1以后提供的缓存类。

15.LruCache类源码分析

<1>LruCache是一个泛型类,主要的算法原理就是把最近使用的那些对象,用强引用的形式存储在我们一个HashMap当中。而这个HashMap它是LinkedHashMap。还有在这里说的强引用,就是我们平时所使用的最正常的一种引用方式。当这个缓存满的时候,它会把最近最少使用的对象从内存当中移除。所以它提供了一个put方法和一个get方法来进行缓存的添加和获取。

<2>核心思想就是维持了一个缓存对象列表,即LinkedHashMap,然后里边的列表的排列方式是按访问的顺序实现的。所以说一直没有访问对象它就会处在整个队的队尾,所以你要删除时就会从队尾把它删除掉。

<3>构造函数

  1.  
    public LruCache(int maxSize) {
  2.  
    if (maxSize <= 0) {
  3.  
    throw new IllegalArgumentException("maxSize <= 0");
  4.  
    }
  5.  
    this.maxSize = maxSize;
  6.  
    this.map = new LinkedHashMap<K, V>(0, 0.75f, true);
  7.  
    }

在它的构造函数中实现了一个LinkedHashMap,所以说LruCache它的整体的那些逻辑原理都是通过LinkedHashMap来实现的。

<4>put方法

  1.  
    @Nullable
  2.  
    public final V put(@NonNull K key, @NonNull V value) {
  3.  
    if (key == null || value == null) {
  4.  
    throw new NullPointerException("key == null || value == null");
  5.  
    }
  6.  
     
  7.  
    V previous;
  8.  
    synchronized (this) {
  9.  
    putCount ;
  10.  
    size = safeSizeOf(key, value);
  11.  
    previous = map.put(key, value);
  12.  
    if (previous != null) {
  13.  
    size -= safeSizeOf(key, previous);
  14.  
    }
  15.  
    }
  16.  
     
  17.  
    if (previous != null) {
  18.  
    entryRemoved(false, key, previous, value);
  19.  
    }
  20.  
     
  21.  
    trimToSize(maxSize);
  22.  
    return previous;
  23.  
    }
学新通

我们对如下代码进行分析

  1.  
    if (key == null || value == null) {
  2.  
    throw new NullPointerException("key == null || value == null");
  3.  
    }

这里会判断一下key和value是否为空,为空的话它会抛出异常。

  1.  
    synchronized (this) {
  2.  
    putCount ;
  3.  
     
  4.  
    ...
  5.  
     
  6.  
    }

putCount为插入的缓存对象值,在这个同步代码块中,会将插入的缓存对象值 1。

  1.  
    synchronized (this) {
  2.  
     
  3.  
    ...
  4.  
     
  5.  
    size = safeSizeOf(key, value);
  6.  
     
  7.  
    ...
  8.  
     
  9.  
    }

 增加已有的缓存大小。

  1.  
    synchronized (this) {
  2.  
     
  3.  
    ...
  4.  
     
  5.  
    previous = map.put(key, value);
  6.  
     
  7.  
    ...
  8.  
     
  9.  
    }

它会向这个map当中加入缓存对象。

  1.  
    synchronized (this) {
  2.  
     
  3.  
    ...
  4.  
     
  5.  
    if (previous != null) {
  6.  
    size -= safeSizeOf(key, previous);
  7.  
    }
  8.  
    }

 如果已有缓存对象,那么缓存大小一定要恢复到之前。

entryRemoved(false, key, previous, value);

 这个方法是个空方法,我们可以自己去实现。

trimToSize(maxSize);

这个方法是调整缓存大小所使用的。

可以看到,put方法并没有什么太多难点,只是一些数据结构的操作。重要的是,在添加了缓存之后,我们要调用trimToSize方法,来判断缓存是否已满,满了就删除近期最少使用的算法。

 <5>trimToSize方法

  1.  
    public void trimToSize(int maxSize) {
  2.  
    while (true) {
  3.  
    K key;
  4.  
    V value;
  5.  
    synchronized (this) {
  6.  
    if (size < 0 || (map.isEmpty() && size != 0)) {
  7.  
    throw new IllegalStateException(getClass().getName()
  8.  
    ".sizeOf() is reporting inconsistent results!");
  9.  
    }
  10.  
     
  11.  
    if (size <= maxSize || map.isEmpty()) {
  12.  
    break;
  13.  
    }
  14.  
     
  15.  
    Map.Entry<K, V> toEvict = map.entrySet().iterator().next();
  16.  
    key = toEvict.getKey();
  17.  
    value = toEvict.getValue();
  18.  
    map.remove(key);
  19.  
    size -= safeSizeOf(key, value);
  20.  
    evictionCount ;
  21.  
    }
  22.  
    entryRemoved(true, key, value, null);
  23.  
    }
  24.  
    }
学新通

 我们对如下代码进行分析

  1.  
    if (size < 0 || (map.isEmpty() && size != 0)) {
  2.  
     
  3.  
    ...
  4.  
     
  5.  
    }

如果这个map是为空的,并且这个size不等于0,或者size小于0的时候,这时会抛出异常。

  1.  
    if (size <= maxSize || map.isEmpty()) {
  2.  
    break;
  3.  
    }

如果size小于我们的最大缓存,那就意味着不需要再删除缓存对象了,这时通过break跳出整个循环。

Map.Entry<K, V> toEvict = map.entrySet().iterator().next();

用迭代器获取队尾的那个对象。这个队尾的对象就是那个近期最少访问的元素。

map.remove(key);

这里会调用remove把这个元素删除掉。

总结:trimToSize这个方法其实它就是通过LinkedHashMap不断地去删除它队尾的元素,把最近最少访问的那个元素删除掉。

<6>所以LrcCache算法就是在内部维护了一个LinkedHashMap,这个LinkedHashMap它是按照顺序来排序的。所以说你调用存储它的put方法,它就会结合添加元素的数量判断是否存满,存满了就会删除。这就是LruCache缓存算法的原理。

16.Rxjava是如何实现缓存的

<1>设置一个Observable:检查内存缓存是否有该数据的缓存

  1.  
    String memoryCache = null;
  2.  
     
  3.  
    /*
  4.  
    * 设置第1个Observable:检查内存缓存是否有该数据的缓存
  5.  
    **/
  6.  
    Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
  7.  
    @Override
  8.  
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  9.  
     
  10.  
    // 先判断内存缓存有无数据
  11.  
    if (memoryCache != null) {
  12.  
    // 若有该数据,则发送
  13.  
    emitter.onNext(memoryCache);
  14.  
    } else {
  15.  
    // 若无该数据,则直接发送结束事件
  16.  
    emitter.onComplete();
  17.  
    }
  18.  
     
  19.  
    }
  20.  
    });
学新通

这个Observable它的作用是检查我们内存缓存当中是否有该数据的缓存。这只是我们模拟的一种读取的场景。我们创建一个Observable被观察者还是调用create方法。create是在我们rxjava当中最基本的一种创造事件序列的方法。在create方法内部我们传入了一个OnSubscribe对象。当我们的Observable被观察者被观察者调用的时候,这个OnSubscribe对象中的subscribe方法就会自动被调用。那么我们所要写的事件序列就会按照次序来依次触发。在这个subscribe内部,我们首先会判断内存缓存有没有数据,这里直接判断memoryCache是否为空,如果有数据就调用发射器的onNext方法;如果没有该数据,那么就调用发射器的onComplete来表示这个事件结束了。

<2>接着设置第二个Observable,检查磁盘缓存是否有该数据的缓存

  1.  
    String diskCache = "从磁盘缓存中获取数据";
  2.  
     
  3.  
    /*
  4.  
    * 设置第2个Observable:检查磁盘缓存是否有该数据的缓存
  5.  
    **/
  6.  
    Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
  7.  
    @Override
  8.  
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  9.  
     
  10.  
    // 先判断磁盘缓存有无数据
  11.  
    if (diskCache != null) {
  12.  
    // 若有该数据,则发送
  13.  
    emitter.onNext(diskCache);
  14.  
    } else {
  15.  
    // 若无该数据,则直接发送结束事件
  16.  
    emitter.onComplete();
  17.  
    }
  18.  
     
  19.  
    }
  20.  
    });
学新通

这个Observable它是检查我们磁盘缓存是否有该数据的缓存。它的创建方法也是通过create方法,在create方法内部会传入OnSuscribe这个对象。当我们第二个被观察者被调用的时候,它就会调用subscribe,按照顺序被我们依次触发。它的逻辑与内存缓存类似。它首先会判断磁盘缓存有无数据,通过字符串是否为空来判断。如果有数据它就调用发射器的onNext方法,并将我们字符串传入内部;没有数据就直接调用onComplete方法。

<3>第三个Observable是通过网络获取数据

Observable<String> network = Observable.just("从网络中获取数据");

<4>合并三个被观察者对象

  1.  
    Observable.concat(memory, disk, network)
  2.  
    // 2. 通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
  3.  
    .firstElement()
  4.  
     
  5.  
    // 3. 观察者订阅
  6.  
    .subscribe(new Consumer<String>() {
  7.  
    @Override
  8.  
    public void accept(String s) throws Exception {
  9.  
    Log.d("cache", "最终获取的数据来源 = " s);
  10.  
    }
  11.  
    });

首先我们通过concat合并memory、disk、network这三个被观察者事件。它就是我们检查内存缓存、磁盘缓存和发送网络请求三个事件。然后它会将这三个顺序串连成队列。接下来调用firstElement()方法,从我们前面创建好的串联队列当中,去取出并发送第一个有效事件,也就是next事件。这时候它就会依次判断然后检查memory、disk、network。

该例子逻辑如下:

首先调用firstElement,它就会取出第一个事件memory。那么这时候它就会判断内存缓存当中是否有数据缓存,如果有的话,也就是这个memory如果是不为空的话,那么它就可以直接读取啦,而不会进行下面的操作。

但是如果这个memory是空的话,就表示我们内存缓存当中没有数据。那么这时候它就会往下取,取到第二个,也就是disk,来判断磁盘缓存当中是否有数据缓存。如果这时候磁盘缓存不为空,这时候就会调用next事件,被我们的被观察者监听到,来依次进行响应。所以说它会依次进行判断。最后我们调用subscribe来完成我们观察者的订阅。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfiibec
系列文章
更多 icon
同类精品
更多 icon
继续加载