rxjava理解
简单介绍
rxjava中有Flowable、Maybe、Observable、Single、Completable 5种使用途径,其中Flowable实现reactivestreams接口,是jdk9标准接口,支持背压、并发模式。
反应式编程特点就是在订阅时才执行具体业务代码。
接口介绍
Publisher消息提供发布者,提供订阅方法,发布者定义好后,消费者通过订阅通知发布者可以准备消息了。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber消息订阅者,提供4个方法
public interface Subscriber<T> {
/**
* 发布者可通过该方法,通知下游订阅者收到订阅了,可以通过Subscription 取消或者请求消息。
*/
public void onSubscribe(Subscription s);
/**
* 发布者被订阅或被Subscription请求时,触发订阅者的onNext方法,发布消息。
*/
public void onNext(T t);
/**
* 执行业务过程中发生异常,通知消费者,一般发生异常会中断后续onNext流程
*/
public void onError(Throwable t);
/**
* 通知下游业务执行完成
*/
public void onComplete();
}
Subscription提供reqeust和cancel 2个方法
public interface Subscription {
/**
* 让上游发布消息
*/
public void request(long n);
/**
* 取消消息推送
*/
public void cancel();
}
Processor继承Subscriber、Publisher。
Flowable递归调用
便于链式调用,所有Flowable相关的类继承Flowable,实现subscribeActual,在Flowable中融合所有类的调度方法。
protected abstract void subscribeActual(@NonNull Subscriber<@NonNull ? super T> subscriber);
链式调用结构
左边先调用的发布者在最里面,右边订阅者先调用的在最外面,因为先执行构造定义,在通过订阅串联所有发布者。
Rx多数代码的发布者和订阅者在同一个类中实现,发布者被订阅(subscribe)时,通过包裹订阅者接收上游的onSubscribe,可实现并发、缓存、重试、条件判断等功能。这样就构成,先链式调用的发布者,后被订阅,先收到onSubscribe。
部分代码理解
假设都是链式调用,先被调用的方法(发布者方法)代表前面,后被调用的方法代表后面。发布者称为上游,订阅者称为下游。
线程切换
需要区分Flowable和ParallelFlowable,前者用于单线程,后者用于多线程,多线程和单线程有方法可以动态切换。
- Flowable的subscribeOn和observeOn
subscribeOn 表示,首次被调用起到最上游的request请求和所有订阅者的onNext方法都由该线程执行。
构造函数的入参source为先链式调用的上游。
切换线程后执行上游的订阅,触发上游调用onSubscribe、onNext等方法。如果上游不自动推送消息下来,在上游调用onSubscribe时,触发上游的reqeuest,使上游推送消息到该类。
onNext直接转发数据。
observeOn 表示,从被调用起后续所有的onNext方法都由该线程执行。
总结 从上面可以看出,subscribeOn不论在哪里调用,可以作用于所有订阅者,observeOn只能作用于后面的订阅者。
- ParallelFlowable的runOn
和subscribeOn类似,只是订阅者变成多条。
多线程
Flowable使用方法parallel()切换到ParallelFlowable,而ParallelFlowable使用方法serialize()切换到Flowable。
其中parallel可以指定并行度,在runOn后可并发调用parallel的subscribe,serialize收集多个订阅者的数据单线程推送给下游。
parallel()
serialize()
flatmap和map区别
flatmap用于获取一堆数据,类似一对多的关系,map用于数据转换,类似从一对一的关系。
示例代码
代码中FlowableBufferedCreate根据FlowableCreate改写的,把queue替换成blockingqueue,用于缓存满的时候堵塞上游数据推送,避免堆积到内存里面。
ParallelListenableFuture用于当ListenableFuture调用完成后推送给下游。
// 1. 多线程查询
ParallelFlowable<MsgPushMgtDDO> queryParallelFlowable = Flowable.range(0, queryThreadSize)
.parallel(2)
// 切换到查询线程
.runOn(Schedulers.from(msgpushQueryTaskExecutor, true))
.flatMap((index) -> new FlowableBufferedCreate(emitter -> {
Integer querySize = queryThreadSize;
Integer queryIndex = index;
if (queryThreadSize == 1) {
querySize = null;
queryIndex = null;
}
// 调用查询
msgPushBO.query(shardId, null, queryIndex, querySize, (target) -> {
emitter.onNext(target);
redissionThreadLockTask.access();
}, metricRegistry);
// 触发完成
emitter.onComplete();
}, 256)
);
Flowable<MsgPushMgtDDO> queryFlowable = queryParallelFlowable
// 并行转单行且发生异常后,延迟发送错误信息,一个分支错误,不影响其他分支继续查询
.sequentialDelayError();
// 2.多线程推送
ParallelFlowable<MsgListenableFutureMap> handleParallelFlowable = queryFlowable
// 推送是nio,这里不设置并发数量了
.parallel()
// 切换到推送线程
.runOn(Schedulers.from(msgpushHandleTaskExecutor, false))
.map((target) -> {
redissionThreadLockTask.access();
// 推送
ListenableFuture<NatMsgPushResultDTO> listenableFuture = msgPushBO.handleNioExecute(shardId, null, target, metricRegistry);
return new MsgListenableFutureMap(listenableFuture, target);
});
// future数据提取,推送线程future异常屏蔽
ParallelListenableFuture<BatchResultHandle.BatchExcuteResult<MsgPushMgtDDO, Object>>
parallelListenableFuture = new ParallelListenableFuture(handleParallelFlowable);
// 推送结果组装成list
Flowable<List<BatchResultHandle.BatchExcuteResult<MsgPushMgtDDO, Object>>> handleFlowable = parallelListenableFuture
.sequential()
.serialize()
.buffer(100, () -> new ArrayList<>());
final ListenableFutureDisposable listenableFutureDisposable = new ListenableFutureDisposable();
// 3. 数据更新
Disposable disposable = handleFlowable
// 允许多线程更新
.parallel(4, 1)
.runOn(Schedulers.from(msgpushUpdateTaskExecutor, false))
.sequential()
.subscribe((result) -> {
redissionThreadLockTask.access();
msgPushBO.handleAfter(result, metricRegistry);
String msg = "分片ID【" shardId "】,当前已处理数据:" parallelListenableFuture.getFutureDoneSize() "条!";
listenableFutureCallback.info(msg);
}, (error) -> {
redissionThreadLockTask.realease();
listenableFutureDisposable.setDone();
listenableFutureCallback.info("分片ID【" shardId "】执行异常:" ThrowableUtil.stackTraceFullToString(error));
}, () -> {
redissionThreadLockTask.realease();
listenableFutureDisposable.setDone();
String msg = "分片ID【" shardId "】执行成功,处理数据:" parallelListenableFuture.getFutureDoneSize() "条!"
"\r\n执行效率:\r\n" report(metricRegistry);
listenableFutureCallback.info(msg);
});
listenableFutureDisposable.setDisposable(disposable);
return listenableFutureDisposable;
总结
rxjava解决的是业务存在耗时操作导致整个请求堵塞,可以通过线程切分,耗时操作放入单独的线程中执行,对于webflux结合nio请求可以节约线程数量,提供系统吞吐率。对于后端业务,为耗时操作提供异步解决方法,框架封装很多常用操作,提高开发效率。rxjava其实在Android中用得广泛,可以让页面刷新和耗时业务分开,避免界面假死。
存在问题 不能根据下游吞吐量自动调整推送线程数量,只能根据经验或测试后,根据结果手动调整。下游如果能在不降低单线程效率情况下,提高推送线程数量提供吞吐量,上游可以尝试自动感知尝试。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgccehc
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01