• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

rxjava理解

武飞扬头像
想入非飞
帮助1

简单介绍

rxjava中有Flowable、Maybe、Observable、Single、Completable 5种使用途径,其中Flowable实现reactivestreams接口,是jdk9标准接口,支持背压、并发模式。
反应式编程特点就是在订阅时才执行具体业务代码。

接口介绍

Publisher消息提供发布者,提供订阅方法,发布者定义好后,消费者通过订阅通知发布者可以准备消息了。

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber消息订阅者,提供4个方法

public interface Subscriber<T> {
    /**
     * 发布者可通过该方法,通知下游订阅者收到订阅了,可以通过Subscription 取消或者请求消息。
     */
    public void onSubscribe(Subscription s);

    /**
     *  发布者被订阅或被Subscription请求时,触发订阅者的onNext方法,发布消息。
     */
    public void onNext(T t);

    /**
     * 执行业务过程中发生异常,通知消费者,一般发生异常会中断后续onNext流程
     */
    public void onError(Throwable t);

    /**
     * 通知下游业务执行完成
     */
    public void onComplete();
}
学新通

Subscription提供reqeust和cancel 2个方法

public interface Subscription {
    /**
     *  让上游发布消息
     */
    public void request(long n);

    /**
     * 取消消息推送
     */
    public void cancel();
}

Processor继承Subscriber、Publisher。

Flowable递归调用

便于链式调用,所有Flowable相关的类继承Flowable,实现subscribeActual,在Flowable中融合所有类的调度方法。

protected abstract void subscribeActual(@NonNull Subscriber<@NonNull ? super T> subscriber);

链式调用结构

学新通
左边先调用的发布者在最里面,右边订阅者先调用的在最外面,因为先执行构造定义,在通过订阅串联所有发布者。
Rx多数代码的发布者和订阅者在同一个类中实现,发布者被订阅(subscribe)时,通过包裹订阅者接收上游的onSubscribe,可实现并发、缓存、重试、条件判断等功能。这样就构成,先链式调用的发布者,后被订阅,先收到onSubscribe。

部分代码理解

假设都是链式调用,先被调用的方法(发布者方法)代表前面,后被调用的方法代表后面。发布者称为上游,订阅者称为下游。

线程切换

需要区分Flowable和ParallelFlowable,前者用于单线程,后者用于多线程,多线程和单线程有方法可以动态切换。

  • Flowable的subscribeOn和observeOn

subscribeOn 表示,首次被调用起到最上游的request请求和所有订阅者的onNext方法都由该线程执行。
学新通
构造函数的入参source为先链式调用的上游。
学新通
切换线程后执行上游的订阅,触发上游调用onSubscribe、onNext等方法。如果上游不自动推送消息下来,在上游调用onSubscribe时,触发上游的reqeuest,使上游推送消息到该类。
学新通
学新通
学新通
onNext直接转发数据。

observeOn 表示,从被调用起后续所有的onNext方法都由该线程执行。
学新通
学新通
学新通
学新通
学新通
学新通

总结 从上面可以看出,subscribeOn不论在哪里调用,可以作用于所有订阅者,observeOn只能作用于后面的订阅者。

  • ParallelFlowable的runOn
    和subscribeOn类似,只是订阅者变成多条。

多线程

Flowable使用方法parallel()切换到ParallelFlowable,而ParallelFlowable使用方法serialize()切换到Flowable。
其中parallel可以指定并行度,在runOn后可并发调用parallel的subscribe,serialize收集多个订阅者的数据单线程推送给下游。

parallel()

学新通
学新通
学新通

学新通
学新通

学新通
学新通

serialize()

学新通
学新通
学新通
学新通
学新通
学新通

flatmap和map区别

flatmap用于获取一堆数据,类似一对多的关系,map用于数据转换,类似从一对一的关系。

示例代码

代码中FlowableBufferedCreate根据FlowableCreate改写的,把queue替换成blockingqueue,用于缓存满的时候堵塞上游数据推送,避免堆积到内存里面。
ParallelListenableFuture用于当ListenableFuture调用完成后推送给下游。

// 1. 多线程查询
        ParallelFlowable<MsgPushMgtDDO> queryParallelFlowable = Flowable.range(0, queryThreadSize)
                .parallel(2)
                // 切换到查询线程
                .runOn(Schedulers.from(msgpushQueryTaskExecutor, true))
                .flatMap((index) -> new FlowableBufferedCreate(emitter -> {
                            Integer querySize = queryThreadSize;
                            Integer queryIndex = index;
                            if (queryThreadSize == 1) {
                                querySize = null;
                                queryIndex = null;
                            }
                            // 调用查询
                            msgPushBO.query(shardId, null, queryIndex, querySize, (target) -> {
                                emitter.onNext(target);
                                redissionThreadLockTask.access();
                            }, metricRegistry);
                            // 触发完成
                            emitter.onComplete();
                        }, 256)
                );

        Flowable<MsgPushMgtDDO> queryFlowable = queryParallelFlowable
                // 并行转单行且发生异常后,延迟发送错误信息,一个分支错误,不影响其他分支继续查询
                .sequentialDelayError();

        // 2.多线程推送
        ParallelFlowable<MsgListenableFutureMap> handleParallelFlowable = queryFlowable
                // 推送是nio,这里不设置并发数量了
                .parallel()
                // 切换到推送线程
                .runOn(Schedulers.from(msgpushHandleTaskExecutor, false))
                .map((target) -> {
                    redissionThreadLockTask.access();
                    // 推送
                    ListenableFuture<NatMsgPushResultDTO> listenableFuture = msgPushBO.handleNioExecute(shardId, null, target, metricRegistry);
                    return new MsgListenableFutureMap(listenableFuture, target);
                });

        // future数据提取,推送线程future异常屏蔽
        ParallelListenableFuture<BatchResultHandle.BatchExcuteResult<MsgPushMgtDDO, Object>>
                parallelListenableFuture = new ParallelListenableFuture(handleParallelFlowable);

        // 推送结果组装成list
        Flowable<List<BatchResultHandle.BatchExcuteResult<MsgPushMgtDDO, Object>>> handleFlowable = parallelListenableFuture
                .sequential()
                .serialize()
                .buffer(100, () -> new ArrayList<>());

        final ListenableFutureDisposable listenableFutureDisposable = new ListenableFutureDisposable();

        // 3. 数据更新
        Disposable disposable = handleFlowable
                // 允许多线程更新
                .parallel(4, 1)
                .runOn(Schedulers.from(msgpushUpdateTaskExecutor, false))
                .sequential()
                .subscribe((result) -> {
                    redissionThreadLockTask.access();
                    msgPushBO.handleAfter(result, metricRegistry);
                    String msg = "分片ID【"   shardId   "】,当前已处理数据:"   parallelListenableFuture.getFutureDoneSize()   "条!";
                    listenableFutureCallback.info(msg);
                }, (error) -> {
                    redissionThreadLockTask.realease();
                    listenableFutureDisposable.setDone();
                    listenableFutureCallback.info("分片ID【"   shardId   "】执行异常:"   ThrowableUtil.stackTraceFullToString(error));
                }, () -> {
                    redissionThreadLockTask.realease();
                    listenableFutureDisposable.setDone();
                    String msg = "分片ID【"   shardId   "】执行成功,处理数据:"   parallelListenableFuture.getFutureDoneSize()   "条!"
                              "\r\n执行效率:\r\n"   report(metricRegistry);
                    listenableFutureCallback.info(msg);
                });
        listenableFutureDisposable.setDisposable(disposable);
        return listenableFutureDisposable;
学新通

总结

rxjava解决的是业务存在耗时操作导致整个请求堵塞,可以通过线程切分,耗时操作放入单独的线程中执行,对于webflux结合nio请求可以节约线程数量,提供系统吞吐率。对于后端业务,为耗时操作提供异步解决方法,框架封装很多常用操作,提高开发效率。rxjava其实在Android中用得广泛,可以让页面刷新和耗时业务分开,避免界面假死。
存在问题 不能根据下游吞吐量自动调整推送线程数量,只能根据经验或测试后,根据结果手动调整。下游如果能在不降低单线程效率情况下,提高推送线程数量提供吞吐量,上游可以尝试自动感知尝试。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgccehc
系列文章
更多 icon
同类精品
更多 icon
继续加载