Android的RxJava
最近准备梳理一下
Kotlin
,先复习一遍RxJava
思想,做个学习笔记 伪代码,整个脉络分为三个部分。
(一)使用场景
RxJava
是重量级、最复杂的框架(没有之一),JakeWharton 的巅峰之作,操作符非常丰富、特别庞大,学关键的内容,学思维方式,看PPT资料,学两遍。
为什么要学习RxJava?
改变思维(Rx思维)来提升效率,响应式编程/异步事件流编程
Rx思维:起点(分发事件)—>…—>终点(消费事件),中间不会断掉且可以做拦截,链条式思维
学习资料
-
思维扩展:Rx系列
【以下五部分难度逐步提升!!!】
一、核心思想(基础⭐)
传统思维:不同项目(程序员)实现有不同的思路,封装、Thread、线程池……
dialog—>Thread
/AsyncTask
/…—>Handler
—>UI
Rx思维/卡片式编程/观察者设计模式:
起点/被观察者/Observable—>订阅—>终点/观察者/Observer
封装线程调度,方便多处使用线程切换:
// 封装线程调度,UD:upstram/downloadstream
private final static <UD> ObservableTransformer<UD, UD> rxud {
return new ObservableTransformer<UD, UD>(){
@override
public ObservableSource<UD> apply(Observable<UD> upstream){
// 方法返回自己this,链式调用无限扩展,可以继续作死的调用
return upstream.subscribeOn(Schedulers.io())//给上层分配异步线程
.observerOn(AndroidSchedulers.mainThread())// 给下层分配主线程
.map(new Function<UD, UD>(){
Log.d(TAG, "balabala...");
return null;
});
}
}
}
事件触发:
public void reJavaDownloadImageAction(View view){
Observable.just(PATH)// ②起点
.map(new Function<String, Bitmap>(){// ③卡片式拦截
...// 请求服务器下载图片操作
return bitmap;
})
.map(new Function<Bitmap, Bitmap>(){// 需求:水印
...// 加水印操作
return newBitmap;
})
.map(new Function<Bitmap, Bitmap>(){// 需求:日志
Log.d(TAG, "balabala...");// Log
return newBitmap;
})
// 线程调度
//.subscribeOn(Schedulers.io())
//.observerOn(AndroidSchedulers.mainThread())
.compose(rxud())// 可以抽取封装起来
.subscribe(// 订阅,上层区域和下层区域
new Observer<Bitmap>() {// 终点
onSubScribe(Disposable d)// ①订阅开始,预备操作
onNext(Bitmap bitmap){// ④拿到事件,和起点类型一致
image.setImageBitmap(bitmap);
}
onError(Throwable e)// 错误事件
onComplete()// ⑤完成事件
}
);
}
二、RxJava
配合Retrofit
(常用⭐⭐)
常用的网络模型框架开发组合套装:Retrofit
(通过OkHttp
请求网络)—>RxJava
(仅处理返回数据)—>UI
注:Retrofit
不是网络框架,是个强大的封装框架,负责管理
网络请求接口
interface WanAndroidApi{
// 异步线程,耗时操作
// 总数据
@GET("project/tree/json")
Observable<ProjectBean> getProject();
// Item数据
//@GET("project/list/1/json?cid=294")
@GET("project/list/{pageIndex}/json")// 使用注解动态传参
Observable<ProjectItem> getProject(@Path("pageIndex")int pageIndex, @Query("cid")int cid);
}
具体工具类封装:
HttpUtil {
public static String BASE_URL="https://www.wanandroid.com/";
public static void setBaseUrl(String baseUrl) {
BASE_URL = baseUrl;
}
// 根据各种配置创建出Retrofit
public static Retrofit getOnlineCookieRetrofit() {
// OkHttp客户端
OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
HttpLoggingInterceptor logInterceptor = new HttpLoggingInterceptor(new HttpLogger());
logInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient okHttpClient = httpBuilder
.addInterceptor(logInterceptor)
.addNetworkInterceptor(new StethoInterceptor())
.readTimeout(1000, TimeUnit.SECONDS)
.connectTimeout(1000, TimeUnit.SECONDS)
.writeTimeout(1000, TimeUnit.SECONDS)
.build();
return new Retrofit.Builder().baseUrl(BASE_URL)
// step1:请求用OkHttp
.client(okHttpClient)
// step2:响应用RxJava
.addConverterFactory(GsonConverterFactory.create(new Gson()))// 添加一个json解析工具
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
}
Activity 中使用:
private WanAndroidApi api;
onCreate() {
api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
}
onClick(View view){
api.getProject()
.subscribeOn(Schedulers.io)//io专门处理大量的读取数据,异步类型
.observeOn(AndroidSchedulers.mainThread())
// 1. 标准使用方式
/*.subscribe(new Observer<ProjectBean>(){
onSubscribe()
onNext()
onError()
onComplete()
})*/
// 2. 简化全部内置函数
/*.subscribe(new Consumer<ProjectBean>(){
@override
public void accept(ProjectBean projectBean) Throws Exception {
//...
}
})*/
// 3. 使用lambda写法类型自动推导,代码简洁,不推荐,逻辑不是很清晰
.subscribe(data-> {
Log.d(TAG, "getProjectAction:" data);
});
}
onItemClick(View view){
// 点击列表Item,两层逻辑嵌套...
}
三、View
防抖(大公司/RxBinding
⭐⭐⭐)
定义:瞬间连续点击/自动化脚本……
onClick(View view){
antiShakeAction();
}
// 针对某控件2秒内点击20次,只响应一次。n层逻辑嵌套...
@SuppressLint("CheckResult")
antiShakeAction() {
Button bt_anti_shake = findViewById();
RxView.click(bt_anti_shake)
.throttleFirst(2000, TimeUnit.MILLISECONDS)// 2秒
.subscribe(new Consumer<Object>(){
@override
public void accept(Object o) throws Exception {
api.getProject()
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<ProjectBean>(){
@override
public void accept(ProjectBean projectBean) throws Exception{
for(ProjectBean.DataBean dataBean : projectBean.getData()){
api.getProjectItem(1, dataBean.getId())
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<ProjectItem>(){
@override
public void accept(ProjectItem projectItem) throws Exception{
Log.d(TAG, "accept:" projectItem);//UI操作
};
});
}
};
});
};
});
}
四、网络嵌套(实用⭐⭐⭐⭐)
下一步操作依赖于上一步操作得结果,先查询主数据,再查询具体的数据,如上
使用flatMap
操作符解决,自己会分发N多数据
onClick(View view){
//antiShakeAction();
antiShakeActionUpdate();
}
// 针对某控件2秒内点击20次,只响应一次。n层逻辑嵌套...
@SuppressLint("CheckResult")
antiShakeActionUpdate() {
Button bt_anti_shake = findViewById();
RxView.click(bt_anti_shake)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.observeOn(Schedules.io())// 先给下面切换异步线程
.flatMap(new Function<Object, ObservableSource<ProjectBean>>(){
@override
public ObservableSource<ProjectBean> apply(Object o)throws Exception {
return api.getProject();// 返回主数据
}
})
.flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>(){
@override
public ObservableSource<ProjectBean> apply(ProjectBean projectBean)throws Exception {
// 自己搞一个发射器,发送多次,等价于上步的for循环
return Observable.formIterable(projectBean.getData());
}
})
.flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>(){
@override
public ObservableSource<ProjectItem> apply(ProjectItem projectItem)throws Exception {
return api.getProjectItem(1, projectItem.getId());
}
})
.observeOn(AndroidSchedulers.mainThread())// 再给下面切换主线程
.subscribe(new Consumer<ProjectItem projectItem>(){
@override
public void accept(ProjectItem projectItem) throws Exception {
// 显示数据
}
});
}
五、doOnNext
运用(难度最高⭐⭐⭐⭐⭐)
使用场景:主线程和异步线程之间频繁的线程切换(银行的业务……)
Retrofit
RxJava
实现模拟案例:
- show progressDialog
- 请求服务器注册操作
- 注册完成之后更新注册UI
- 马上登录服务器操作
- 登录完成之后更新登录UI
LoginRequest {}
RegisterRequest {}
LoginResponse {}
RegisterResponse {}
interface IRequestNetwork {
public Observable<RegisterResponse> registerAction(@Body RegisterRequest registerRequest);
public Observable<LoginResponse> loginAction(@Body LoginRequest loginRequest);
}
// 方式一,单独执行
request(View view) {
MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.registerAction(new RegisterRequest())
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<RegisterResponse>() {
accept(RegisterResponse registerResponse) {
// 更新Register UI
}
});
MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.loginAction(new LoginRequest())
.compose(DownloadActivity.rxud())
.subscribe(new Consumer<LoginResponse>() {
accept(LoginResponse loginResponse) {
// 更新Login UI
}
});
}
// 方式二,优化为流式代码
Disposable disposable;// 为了销毁释放提取为全局,消除代码黄色警告
requestBetter(View view) {
disposable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.registerAction(new RegisterRequest())// step2
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<RegisterResponse>(){
accept(RegisterResponse registerResponse) {
// step3
}
})
.observeOn(Schedulers.io())
// step4
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>(){
@override
public ObservableSource<LoginResponse> apply(LoginResponse loginResponse)throws Exception {
Obsevable<LoginResponse> loginResponseObsevable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
.loginAction(new LoginRequest());
return loginResponseObsevable;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<LoginResponse>() {
onSubscribe(Disposable d) {// step1
progressDialog.show();
disposable = d;
}
onNext()// step5:更新Login UI
onError()
onComplete()// step6:dismiss progressDialog
});
}
onDestory() {
if(disposable!=null && !disposable.isDisposed()){
disposable.dispose();// 规范写法,必须释放
}
}
思考:说说自己对RxJava
核心思想的理解?
有一个起点和一个终点,起点开始流向我们的事件,把事件流向终点,在流向过程中可以添加拦截,拦截时可以对事件进行改变操作,终点只关心它上一个拦截,根据上一个拦截的变化而变化。
(二)模式与原理
源码、流程分析,观察者设计模式,map
变换操作符原理
// TODO 看书,关于观察者设计模式的部分、并发编程线程池
被观察者抽象层-−—−┐ 观察者抽象层
Observable ├—引用—→Observer
↑ | ↑
实现 | 实现
| | |
被观察者实现层【容器】 | 观察者实现层
ObservableImpl−-−–┘ ObserverImpl
RxJava
的Hook点,对整个项目全局(静态)的RxJava
的监听、拦截
RxJavaPlugins.onAssembly(...)
RxJava 1.x
预留onAssembly()
方法无操作
↓ 优化RxJava 2.x
调用setOnObservableAssembly()
方法赋值,优先执行
↓RxJava 3.x
↓
…
Hook机制:钩子程序,逆向
结论:很多操作符都会经过全局onAssembly
监听
RxJava
的观察者模式和标准的观察者设计模式不同
- 创建Observable
- 创建Observer
- 使用
subscribe()
订阅 - 使用
map()
(发送单个)/flatMap()
(发送多次)
洋葱模型:流程是U型结构封包裹、拆包裹
// step2:Observable创建源码,返回ObservableCreate,ObservableOnSubscribe自定义标签,判空 onAssembly(),并把自定义source丢进ObservableCreate
Observable.create(new ObservableOnSubscribe<String>(){
subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("A");
}
})
// step4:得到ObservableMap进入subscribeActual(),层层包裹的洋葱模型,执行上一层的source.subscribe()把包裹传回到ObservableCreate里面,传进发射器Emitter里面。Function抽象块,具体传入的参数是实现块,内部进行类型变换。
.map(new Function<String, Bitmap>())
// step1:先看Observer的源码,最简单,interface 泛型
// step3:订阅过程ObservableCreate.subscribe()方法,传入自定义观察者(终点),健壮性校验,ObservableCreate.subscribeActual()抽象函数
.subscribe(new Observer<String>(){
onSubscribe()// subscribe后马上执行
onNext(String s)// 拿到上一个卡片流下来的数据,类型和起始泛型保持类型一致
onError()// 执行错误
onComplete()// 时间结束
});
标准的观察者模式中,有一个被观察者 N多观察者,被观察者发生改变,所有的观察者随之得到改变事件。
RxJava
中多个被观察者observable/map()
一个观察者Observer,订阅后马上触发。严格来讲RxJava
叫发布订阅模式,多出一个抽象层做转换,实质上不是观察者模式,达到的效果一样。
RxJava
中的装饰模型,U型结构概括不够完整:
Observable.create(new ObservableOnSubscribe<String>(){
subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("A");
}
})
.map(new Function<String, Bitmap>())
.map(new Function<Integer, Boolean>())
// ...
.subscribe(new Observer<String>(){
onSubscribe()
onNext(String s)
onError()
onComplete()
});
由内而外:
- ObservableCreate
- ObservableMap
- ObservableMap
- …
- Subscribe
上下来回的流程:
- 装饰模型↓(订阅/触发导火线—>必须调用,否则后续无法执行)
- 封包裹↑
- 拆包裹/执行流程
onNext()
↓
背压使用/策略/原理:
生产的速度>消费的速度,导致的内存泄漏
使用Flowable
解决背压。
Single
是Observable
简化版,有局限性
思考:用自己的理解画出map变换操作符详细思路、流程图。
(三)原理与自定义操作符
线程切换(线程调度)原理、自定义RxView
操作符
异步事件流编程:
执行过程中可以随意的切换线程
RxAndroid
在客户端开发中必引入,配合使用
- RxJava 80%
- RxAndroid 20%
AndroidSchedulers.mainThread()
create()
最原始的方式,执行过程是可控的,just()
内部封装,不可控。
// step2:ObservableCreate调用subscribeOn()方法触发
.subscribeOn(
// step1
// 内部RxJavaPlugins.onIoScheduler(IO)进行Hook,唯一赋值函数为setIoSchedulerHandler()
// 策略机制,有多种策略……
Schedulers.io()// 耗时读异步操作
// Schedulers.newThread()// 开启新线程,频繁的
)
.subscribe()// step3:ObservableSubScribeOn调用subscribe()方法触发
onCreate() {
// IO传递进去的hook,
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>(){
apply(Scheduler scheduler) {
Log.d(TAG, "全局监听scheduler:" scheduler);
return scheduler;
}
});
// 初始化IO的hook
RxJavaPlugins.setInitIoSchedulerHandler(new Function<Callable<Scheduler>, Scheduler>(){
apply(Callable<Scheduler> schedulerCallable) {
Log.d(TAG, "全局监听schedulerCallable" schedulerCallable);
return schedulerCallable.call();
}
});
}
RxJavaPlugins.java
static {
...
IO = RxJavaPlugins.initIoScheduler(new IOTask());
NEW_THREAD = RxJavaPlugins.initIoScheduler(new NewThreadTask());
...
}
DEFAULT = new IoScheduler();
DEFAULT = new NewThreadScheduler();
结论: 经过了层层包装,最终交给线程池管控。
除了onSubscribe()
都是异步线程。
测试终点切回主线程
new Thread() {
run();
test();
}.start();
// 子线程做事
test() {
...
.observerOn(
AndroidSchedulers.mainThread()
)
}
通过Handler
切换回主线程,Looper.getMainLooper()
能保证100%在主线程
DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
如果当前是主线程就不做切换,节省开销。
自定义操作符RxView
:防抖控件
操作符就是函数。
RxView {
TAG = RxView.class.getSimpleName();
Observable<Object> clicks(View view) {
return ViewClickObservable(view);
}
}
ViewClickObservable extends Observable<Object> {
private final View view;
// 事件
private static final Object EVENT = new Object();
private static Object EVENT2;
public ViewClickObservable(View view) {
this.view = view;
EVENT2 = view;
}
subscribeActual(Observer<? super Object> observer) {
MyLisener myListener = new MyListener(view, observer);
observer.onSubscribe(myListener);
this.view.setOnClickListener(myListener);
}
static final class MyListener implements View.OnClickListener, Disposable {
private final View view;
private final Observer<Object> observer;
// 原子性 [理解AtomicBoolean](https://www.jianshu.com/p/8a44d4a819bc)
private final AtomicBoolean isDisposable = new AtomicBoolean();// 原子类型
public MyListener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
onClick(View v){
if(isDisposed() == false) {
observer.onNext(EVENT)
}
}
dispose(){// 中断
if(isDisposable.compareAndSet(false, true)){
// 主线程
if(Looper.myLooper() == Looper.getMainLooper()){
view.setOnClickListener(null);
} else {// 子线程通过Handler切换
/*new Hanler(Looper.getMainLooper()){
handleMessage(Message msg){
super.handleMessage(msg);
view.setOnClickListener(null);
}
}*/
// 不使用Handler,使用Rx中的代码风格
AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
run() {
view.setOnClickListener(null);
}
});
}
}
}
isDisposed(){
return isDisposable.get();
}
}
}
在 Activity 中使用:
onCreate() {
Button button = findViewById();
RxView.clicks(button)
.throttleFirst(2000, TimeUnit.SECONDS)
.subscribe(new Consumer<Object>(){
accept(Object o) {
Observable.create(new ObservableOnSubscribe<String>(){
subscribe(ObservableEmiiter<String> e){
e.onNext("aaaa");
}
})
.subscribe(new Consumer<Strign>(){
accept(String s){
Log.d(TAG, s);
}
});
}
});
}
思考:自己对RxJava
线程切换的理解?
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgbkfjf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01