• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Android的RxJava

武飞扬头像
小山研磨代码
帮助1

最近准备梳理一下Kotlin,先复习一遍RxJava思想,做个学习笔记 伪代码,整个脉络分为三个部分。

(一)使用场景

RxJava是重量级、最复杂的框架(没有之一),JakeWharton 的巅峰之作,操作符非常丰富、特别庞大,学关键的内容,学思维方式,看PPT资料,学两遍。

为什么要学习RxJava

改变思维(Rx思维)来提升效率,响应式编程/异步事件流编程

Rx思维:起点(分发事件)—>…—>终点(消费事件),中间不会断掉且可以做拦截,链条式思维

学习资料

【以下五部分难度逐步提升!!!】

一、核心思想(基础⭐)

传统思维:不同项目(程序员)实现有不同的思路,封装、Thread、线程池……

dialog—>Thread/AsyncTask/…—>Handler—>UI

Rx思维/卡片式编程/观察者设计模式:
起点/被观察者/Observable—>订阅—>终点/观察者/Observer

封装线程调度,方便多处使用线程切换:

// 封装线程调度,UD:upstram/downloadstream
private final static <UD> ObservableTransformer<UD, UD> rxud {
	return new ObservableTransformer<UD, UD>(){
		@override
		public ObservableSource<UD> apply(Observable<UD> upstream){
			// 方法返回自己this,链式调用无限扩展,可以继续作死的调用
			return upstream.subscribeOn(Schedulers.io())//给上层分配异步线程
				.observerOn(AndroidSchedulers.mainThread())// 给下层分配主线程
				.map(new Function<UD, UD>(){
					Log.d(TAG, "balabala...");
					return null;
				});
		}
	}
}

事件触发:

public void reJavaDownloadImageAction(View view){
	Observable.just(PATH)// ②起点
		.map(new Function<String, Bitmap>(){// ③卡片式拦截
			...// 请求服务器下载图片操作
			return bitmap;
		})
		.map(new Function<Bitmap, Bitmap>(){//  需求:水印
			...// 加水印操作
			return newBitmap;
		})
		.map(new Function<Bitmap, Bitmap>(){//  需求:日志
			Log.d(TAG, "balabala...");// Log
			return newBitmap;
		})
		// 线程调度
		//.subscribeOn(Schedulers.io())
		//.observerOn(AndroidSchedulers.mainThread())
		.compose(rxud())// 可以抽取封装起来
		.subscribe(// 订阅,上层区域和下层区域
			new Observer<Bitmap>() {// 终点
				onSubScribe(Disposable d)// ①订阅开始,预备操作
				onNext(Bitmap bitmap){// ④拿到事件,和起点类型一致
					image.setImageBitmap(bitmap);
				}
				onError(Throwable e)// 错误事件
				onComplete()// ⑤完成事件
			}
		);
}
学新通

二、RxJava配合Retrofit(常用⭐⭐)

常用的网络模型框架开发组合套装:
Retrofit(通过OkHttp请求网络)—>RxJava(仅处理返回数据)—>UI
注:Retrofit不是网络框架,是个强大的封装框架,负责管理

网络请求接口

interface WanAndroidApi{
	// 异步线程,耗时操作

	// 总数据
	@GET("project/tree/json")
	Observable<ProjectBean> getProject();

	// Item数据
	//@GET("project/list/1/json?cid=294")
	@GET("project/list/{pageIndex}/json")// 使用注解动态传参
	Observable<ProjectItem> getProject(@Path("pageIndex")int pageIndex, @Query("cid")int cid);
}

具体工具类封装:

HttpUtil {
	public static String BASE_URL="https://www.wanandroid.com/";
	
	public static void setBaseUrl(String baseUrl) {
		BASE_URL = baseUrl;
	}

	// 根据各种配置创建出Retrofit
	public static Retrofit getOnlineCookieRetrofit() {
		// OkHttp客户端
		OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
		HttpLoggingInterceptor logInterceptor = new HttpLoggingInterceptor(new HttpLogger());
		logInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
		OkHttpClient okHttpClient = httpBuilder
				.addInterceptor(logInterceptor)
				.addNetworkInterceptor(new StethoInterceptor())
				.readTimeout(1000, TimeUnit.SECONDS)
				.connectTimeout(1000, TimeUnit.SECONDS)
				.writeTimeout(1000, TimeUnit.SECONDS)
				.build();

		return new Retrofit.Builder().baseUrl(BASE_URL)
			// step1:请求用OkHttp
			.client(okHttpClient)
			// step2:响应用RxJava
			.addConverterFactory(GsonConverterFactory.create(new Gson()))// 添加一个json解析工具
			.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
			.build();
	}
}
学新通

Activity 中使用:

private WanAndroidApi api;

onCreate() {
	api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
}

onClick(View view){
	api.getProject()
		.subscribeOn(Schedulers.io)//io专门处理大量的读取数据,异步类型
		.observeOn(AndroidSchedulers.mainThread())
		// 1. 标准使用方式
		/*.subscribe(new Observer<ProjectBean>(){
			onSubscribe()
			onNext()
			onError()
			onComplete()
		})*/
		// 2. 简化全部内置函数
		/*.subscribe(new Consumer<ProjectBean>(){
			@override
			public void accept(ProjectBean projectBean) Throws Exception {
				//...
			}
		})*/
		// 3. 使用lambda写法类型自动推导,代码简洁,不推荐,逻辑不是很清晰
		.subscribe(data-> {
			Log.d(TAG, "getProjectAction:"   data);
		});
}

onItemClick(View view){
	// 点击列表Item,两层逻辑嵌套...
}
学新通

三、View防抖(大公司/RxBinding⭐⭐⭐)

定义:瞬间连续点击/自动化脚本……

onClick(View view){
	antiShakeAction();
}

// 针对某控件2秒内点击20次,只响应一次。n层逻辑嵌套...
@SuppressLint("CheckResult")
antiShakeAction() {
	Button bt_anti_shake = findViewById();
	RxView.click(bt_anti_shake)
		.throttleFirst(2000, TimeUnit.MILLISECONDS)// 2秒
		.subscribe(new Consumer<Object>(){
			@override
			public void accept(Object o) throws Exception {
				api.getProject()
					.compose(DownloadActivity.rxud())
					.subscribe(new Consumer<ProjectBean>(){
						@override
						public void accept(ProjectBean projectBean) throws Exception{
							for(ProjectBean.DataBean dataBean : projectBean.getData()){
								api.getProjectItem(1, dataBean.getId())
									.compose(DownloadActivity.rxud())
									.subscribe(new Consumer<ProjectItem>(){
										@override
										public void accept(ProjectItem projectItem) throws Exception{
											Log.d(TAG, "accept:" projectItem);//UI操作
										};
									});
							}
						};
					});
			};
		});
}
学新通

四、网络嵌套(实用⭐⭐⭐⭐)

下一步操作依赖于上一步操作得结果,先查询主数据,再查询具体的数据,如上

使用flatMap操作符解决,自己会分发N多数据

onClick(View view){
	//antiShakeAction();
	antiShakeActionUpdate();
}

// 针对某控件2秒内点击20次,只响应一次。n层逻辑嵌套...
@SuppressLint("CheckResult")
antiShakeActionUpdate() {
	Button bt_anti_shake = findViewById();
	RxView.click(bt_anti_shake)
		.throttleFirst(2000, TimeUnit.MILLISECONDS)
		.observeOn(Schedules.io())// 先给下面切换异步线程
		.flatMap(new Function<Object, ObservableSource<ProjectBean>>(){
			@override
			public ObservableSource<ProjectBean> apply(Object o)throws Exception {
				return api.getProject();// 返回主数据
			}
		})
		.flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>(){
			@override
			public ObservableSource<ProjectBean> apply(ProjectBean projectBean)throws Exception {
				// 自己搞一个发射器,发送多次,等价于上步的for循环
				return Observable.formIterable(projectBean.getData());
			}
		})
		.flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>(){
			@override
			public ObservableSource<ProjectItem> apply(ProjectItem projectItem)throws Exception {
				return api.getProjectItem(1, projectItem.getId());
			}
		})
		.observeOn(AndroidSchedulers.mainThread())// 再给下面切换主线程
		.subscribe(new Consumer<ProjectItem projectItem>(){
			@override
			public void accept(ProjectItem projectItem) throws Exception {
				// 显示数据
			}
		});
}
学新通

五、doOnNext运用(难度最高⭐⭐⭐⭐⭐)

使用场景:主线程和异步线程之间频繁的线程切换(银行的业务……)

Retrofit RxJava 实现模拟案例:

  1. show progressDialog
  2. 请求服务器注册操作
  3. 注册完成之后更新注册UI
  4. 马上登录服务器操作
  5. 登录完成之后更新登录UI
LoginRequest {}
RegisterRequest {}
LoginResponse {}
RegisterResponse {}
interface IRequestNetwork {
	public Observable<RegisterResponse> registerAction(@Body RegisterRequest registerRequest);
	public Observable<LoginResponse> loginAction(@Body LoginRequest loginRequest);
}
// 方式一,单独执行
request(View view) {
	MyRetrofit.createRetrofit().create(IRequestNetwork.class)
		.registerAction(new RegisterRequest())
		.compose(DownloadActivity.rxud())
		.subscribe(new Consumer<RegisterResponse>() {
			accept(RegisterResponse registerResponse) {
				// 更新Register UI
			}
		});

	MyRetrofit.createRetrofit().create(IRequestNetwork.class)
		.loginAction(new LoginRequest())
		.compose(DownloadActivity.rxud())
		.subscribe(new Consumer<LoginResponse>() {
			accept(LoginResponse loginResponse) {
				// 更新Login UI
			}
		});
}

// 方式二,优化为流式代码
Disposable disposable;// 为了销毁释放提取为全局,消除代码黄色警告
requestBetter(View view) {
	disposable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
		.registerAction(new RegisterRequest())// step2
		.subscribeOn(Schedulers.io())
		.observeOn(AndroidSchedulers.mainThread())
		.doOnNext(new Consumer<RegisterResponse>(){
			accept(RegisterResponse registerResponse) {
				// step3
			}
		})
		.observeOn(Schedulers.io())
		// step4
		.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>(){
			@override
			public ObservableSource<LoginResponse> apply(LoginResponse loginResponse)throws Exception {
				Obsevable<LoginResponse> loginResponseObsevable = MyRetrofit.createRetrofit().create(IRequestNetwork.class)
					.loginAction(new LoginRequest());
				return loginResponseObsevable;
			}
		})
		.subscribeOn(AndroidSchedulers.mainThread())
		.subscribe(new Observer<LoginResponse>() {
			onSubscribe(Disposable d) {// step1
				progressDialog.show();
				disposable = d;
			}
			onNext()// step5:更新Login UI
			onError()
			onComplete()// step6:dismiss progressDialog
		});
}

onDestory() {
	if(disposable!=null && !disposable.isDisposed()){
		disposable.dispose();// 规范写法,必须释放
	}
}
学新通

思考:说说自己对RxJava核心思想的理解?

有一个起点和一个终点,起点开始流向我们的事件,把事件流向终点,在流向过程中可以添加拦截,拦截时可以对事件进行改变操作,终点只关心它上一个拦截,根据上一个拦截的变化而变化。


(二)模式与原理

源码、流程分析,观察者设计模式,map变换操作符原理

// TODO 看书,关于观察者设计模式的部分、并发编程线程池

被观察者抽象层-−—−┐ 观察者抽象层
Observable ├—引用—→Observer
↑ | ↑
实现 | 实现
| | |
被观察者实现层【容器】 | 观察者实现层
ObservableImpl−-−–┘ ObserverImpl

RxJava的Hook点,对整个项目全局(静态)的RxJava的监听、拦截

RxJavaPlugins.onAssembly(...)

RxJava 1.x 预留onAssembly()方法无操作
↓ 优化
RxJava 2.x 调用setOnObservableAssembly()方法赋值,优先执行

RxJava 3.x

Hook机制:钩子程序,逆向

结论:很多操作符都会经过全局onAssembly监听

RxJava的观察者模式和标准的观察者设计模式不同

  1. 创建Observable
  2. 创建Observer
  3. 使用subscribe()订阅
  4. 使用map()(发送单个)/flatMap()(发送多次)

洋葱模型:流程是U型结构封包裹、拆包裹

// step2:Observable创建源码,返回ObservableCreate,ObservableOnSubscribe自定义标签,判空 onAssembly(),并把自定义source丢进ObservableCreate
Observable.create(new ObservableOnSubscribe<String>(){
		subscribe(ObservableEmitter<String> emitter) {
			emitter.onNext("A");
		}
	})
	// step4:得到ObservableMap进入subscribeActual(),层层包裹的洋葱模型,执行上一层的source.subscribe()把包裹传回到ObservableCreate里面,传进发射器Emitter里面。Function抽象块,具体传入的参数是实现块,内部进行类型变换。
	.map(new Function<String, Bitmap>())
	// step1:先看Observer的源码,最简单,interface 泛型
	// step3:订阅过程ObservableCreate.subscribe()方法,传入自定义观察者(终点),健壮性校验,ObservableCreate.subscribeActual()抽象函数
	.subscribe(new Observer<String>(){
		onSubscribe()// subscribe后马上执行
		onNext(String s)// 拿到上一个卡片流下来的数据,类型和起始泛型保持类型一致
		onError()// 执行错误
		onComplete()// 时间结束
	});
学新通

标准的观察者模式中,有一个被观察者 N多观察者,被观察者发生改变,所有的观察者随之得到改变事件。

RxJava中多个被观察者observable/map() 一个观察者Observer,订阅后马上触发。严格来讲RxJava发布订阅模式,多出一个抽象层做转换,实质上不是观察者模式,达到的效果一样。

RxJava中的装饰模型,U型结构概括不够完整:

Observable.create(new ObservableOnSubscribe<String>(){
		subscribe(ObservableEmitter<String> emitter) {
			emitter.onNext("A");
		}
	})
	.map(new Function<String, Bitmap>())
	.map(new Function<Integer, Boolean>())
	// ...
	.subscribe(new Observer<String>(){
		onSubscribe()
		onNext(String s)
		onError()
		onComplete()
	});

由内而外:

  • ObservableCreate
  • ObservableMap
  • ObservableMap
  • Subscribe

上下来回的流程:

  • 装饰模型↓(订阅/触发导火线—>必须调用,否则后续无法执行)
  • 封包裹↑
  • 拆包裹/执行流程onNext()

背压使用/策略/原理:

生产的速度>消费的速度,导致的内存泄漏
使用Flowable解决背压。

SingleObservable简化版,有局限性

思考:用自己的理解画出map变换操作符详细思路、流程图。


(三)原理与自定义操作符

线程切换(线程调度)原理、自定义RxView操作符

异步事件流编程:

执行过程中可以随意的切换线程

RxAndroid 在客户端开发中必引入,配合使用

  • RxJava 80%
  • RxAndroid 20% AndroidSchedulers.mainThread()

create()最原始的方式,执行过程是可控的,just()内部封装,不可控。

// step2:ObservableCreate调用subscribeOn()方法触发
.subscribeOn(
	// step1
	// 内部RxJavaPlugins.onIoScheduler(IO)进行Hook,唯一赋值函数为setIoSchedulerHandler()
	// 策略机制,有多种策略……
	Schedulers.io()// 耗时读异步操作
	// Schedulers.newThread()// 开启新线程,频繁的
)
.subscribe()// step3:ObservableSubScribeOn调用subscribe()方法触发

onCreate() {
	// IO传递进去的hook,
	RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>(){
		apply(Scheduler scheduler) {
			Log.d(TAG, "全局监听scheduler:" scheduler);
			return scheduler;
		}
	});

	// 初始化IO的hook
	RxJavaPlugins.setInitIoSchedulerHandler(new Function<Callable<Scheduler>, Scheduler>(){
		apply(Callable<Scheduler> schedulerCallable) {
			Log.d(TAG, "全局监听schedulerCallable" schedulerCallable);
			return schedulerCallable.call();
		}
	});
}
学新通

RxJavaPlugins.java

static {
	...
	IO = RxJavaPlugins.initIoScheduler(new IOTask());
	NEW_THREAD = RxJavaPlugins.initIoScheduler(new NewThreadTask());
	...
}
DEFAULT = new IoScheduler();
DEFAULT = new NewThreadScheduler();

结论: 经过了层层包装,最终交给线程池管控。

除了onSubscribe()都是异步线程。

测试终点切回主线程

new Thread() {
	run();
	test();
}.start();

// 子线程做事
test() {
	...
	.observerOn(
		AndroidSchedulers.mainThread()
	)
}

通过Handler切换回主线程,Looper.getMainLooper()能保证100%在主线程

DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

如果当前是主线程就不做切换,节省开销。

自定义操作符RxView:防抖控件

操作符就是函数。

RxView {
	TAG = RxView.class.getSimpleName();

	Observable<Object> clicks(View view) {
		return ViewClickObservable(view);
	}
}
ViewClickObservable extends Observable<Object> {
	private final View view;

	// 事件
	private static final Object EVENT = new Object();
	private static Object EVENT2;

	public ViewClickObservable(View view) {
		this.view = view;
		EVENT2 = view;
	}

	subscribeActual(Observer<? super Object> observer) {
		MyLisener myListener = new MyListener(view, observer);
		observer.onSubscribe(myListener);
		this.view.setOnClickListener(myListener);
	}

	static final class MyListener implements View.OnClickListener, Disposable {
		private final View view;
		private final Observer<Object> observer;
		// 原子性 [理解AtomicBoolean](https://www.jianshu.com/p/8a44d4a819bc)
		private final AtomicBoolean isDisposable = new AtomicBoolean();// 原子类型

		public MyListener(View view, Observer<? super Object> observer) {
			this.view = view;
			this.observer = observer;
		}

		onClick(View v){
			if(isDisposed() == false) {
				observer.onNext(EVENT)
			}
		}
		dispose(){// 中断
			if(isDisposable.compareAndSet(false, true)){
				// 主线程
				if(Looper.myLooper() == Looper.getMainLooper()){
					view.setOnClickListener(null);
				} else {// 子线程通过Handler切换
					/*new Hanler(Looper.getMainLooper()){
						handleMessage(Message msg){
							super.handleMessage(msg);
							view.setOnClickListener(null);
						}
					}*/
					// 不使用Handler,使用Rx中的代码风格
					AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
						run() {
							view.setOnClickListener(null);
						}
					});
				}
			}
		}
		isDisposed(){
			return isDisposable.get();
		}
	}
}
学新通

在 Activity 中使用:

onCreate() {
	Button button = findViewById();

	RxView.clicks(button)
		.throttleFirst(2000, TimeUnit.SECONDS)
		.subscribe(new Consumer<Object>(){
			accept(Object o) {
				Observable.create(new ObservableOnSubscribe<String>(){
					subscribe(ObservableEmiiter<String> e){
						e.onNext("aaaa");
					}
				})
				.subscribe(new Consumer<Strign>(){
					accept(String s){
						Log.d(TAG, s);
					}
				});
			}
		});
}
学新通

思考:自己对RxJava线程切换的理解?

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbkfjf
系列文章
更多 icon
同类精品
更多 icon
继续加载