Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念 (二)
2.2 使用 RxJava 响应式框架的实践
RxJava 库 是 Reactive Extensions 的 Java虚拟机实现,近似于观察者模式,迭代器模式,函数式编程的组合。
2.2.1 响应式流 = 观察者 迭代器
通过事件分离生产者和消费者。
迭代器模式:不希望生产者在消费者出现之前生产数据的场景。
public interface Iterator<T> {
T next();
boolean hasNext();
}
// 结合观察者
public interface RxObserver<T> {
void onNext(T next); // 通知新值
void onComplete(); // 通知结束
void onError(Exception e); // 通知出错
}
RxObserver 类似于前面介绍的观察者模式中的 Observer。
对于订阅的内容和订阅者,我们可以定义 Observale 和 Subcriber
- Observable : 类似于观察者模式中的Subject,可观察的事件源,他会发出元素,并且有流转化方法和流初始化工厂方法。
- Subscriber :抽象类,用于实现 Observer 观察者的接口,并且消费元素,实现类的基础
同时,我们定义 Subcription 来控制 Observable 和 Subscriber 之间的运行时关系。Subscription 可以检查订阅状态,并且在必要的时候取消订阅。
生产者和消费者之间的Subcription契约如下:
---------| — onNext —> | -----------
| Observable | – onComplete -> | Observer |
--------- | — onError —> |------------
根据RxJava中的规则,Observable 事件源可以发送0-N个元素,通过声明成功和引发错误来指示结束。
所以 Observable 事件源会对相关联的 Subscriber 多次调用 onNext,最后调用 onComplete 或 onError。
2.2.2 生产和消费流数据
把 Observable 视为一个事件生成器,在订阅时会给订阅者传播事件。
Observable<String> observable = Obervable.create(
new Observable.OnSubcribe<String>(){
@Override
public void call(Subcriber<? super String> sub){
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
}
);
// 当订阅者出现时立刻会触发 call()
// lambda写法
Observable<String> observable = Obervable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);
Observable 是可重用的,每个订阅者在订阅之后就会立刻收到这个消息事件。RxJava 1.2.7开始,Observable的创建因为不安全被弃用,因为它可能生成太多元素导致订阅者负载过多。即这种方法不支持背压。
对应的订阅者代码如下:
Subcriber<String> subscriber = new Subcriber<String>(){
@Override
public void onNext(String s){
System.out.println("receive: " s);
}
@Override
public void onCompleted(){
System.out.println("Done.");
}
@Override
public void onError(Throwable e){
System.err.println(e);
}
}
所以现在订阅者 Subcriber 和 消息源 Observable 可以一起工作了。Subcriber 必须要实现Observer观察者的方法,定义 onNext 来响应新事件,定义 onCompleted 来响应流完成,定义 onError 来响应错误。
最后只要在 Observable 添加订阅关系即可完成这个响应式demo程序。
observable.subscribe(subcriber);
// output:
// receive: Hello, reactive world!
// Done.
// 更简化的lambda写法
Observable<String> observable = Obervable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);
observable.subscribe(
s -> System.out.println("receive: " s),
() -> System.out.println("Done"),
System.err::println;
);
// output:
// receive: Hello, reactive world!
// Done.
Rxjava 库对于创建消息源 Observable 实例很灵活。
Observable.just("1","2","3","4");
Observable.from(new String[]{"A", "B", "C", "D"});
Observable.from(Collections.emptyList());
Observale<String> msg1 = Observable.fromCallable(()->"hello-");
Future<String> future = Executors.newCachedThreadPool().submit(() -> "world");
Observale<String> msg2 = Observable.from(future);
//组合多个流 处理顺序按照参数顺序
Observale<String> msg = Observable.concat(msg1, msg2, Observable.just("."));
msg.forEach(System.out::print);
// output: hello-world.
虽然有onError信号来处理出错,看起来不用去为异常定义处理程序,但是在发生错误的时候,默认的 Subscriber 实现仍然会抛出一个 rx.exceptions.OnErrorNotImplementedException
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcaake
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13